From a47141d34465770b9ff5794668d974c7b07c4c60 Mon Sep 17 00:00:00 2001 From: Ho 229 Date: Tue, 11 Jun 2024 22:11:27 +0800 Subject: [PATCH] fix: rename and fetch_data progress report --- examples/sftp/src/main.rs | 134 +++++++++++--------------------------- src/filter/info.rs | 14 ++-- src/filter/proxy.rs | 6 +- 3 files changed, 49 insertions(+), 105 deletions(-) diff --git a/examples/sftp/src/main.rs b/examples/sftp/src/main.rs index b34d9fe..8a88488 100644 --- a/examples/sftp/src/main.rs +++ b/examples/sftp/src/main.rs @@ -18,14 +18,14 @@ use wincs::{ filter::{info, ticket, SyncFilter}, placeholder_file::{Metadata, PlaceholderFile}, request::Request, - CloudErrorKind, PopulationType, Registration, SecurityId, SyncRootIdBuilder, + CloudErrorKind, HydrationType, PopulationType, Registration, SecurityId, SyncRootIdBuilder, }; // max should be 65536, this is done both in term-scp and sshfs because it's the // max packet size for a tcp connection const DOWNLOAD_CHUNK_SIZE_BYTES: usize = 4096; // doesn't have to be 4KiB aligned -const UPLOAD_CHUNK_SIZE_BYTES: usize = 4096; +// const UPLOAD_CHUNK_SIZE_BYTES: usize = 4096; const PROVIDER_NAME: &str = "wincs"; const DISPLAY_NAME: &str = "Sftp"; @@ -59,7 +59,7 @@ fn main() { let u16_display_name = U16String::from_str(DISPLAY_NAME); Registration::from_sync_root_id(&sync_root_id) .display_name(&u16_display_name) - .hydration_type(wincs::HydrationType::Full) + .hydration_type(HydrationType::Full) .population_type(PopulationType::Full) .icon( U16String::from_str("%SystemRoot%\\system32\\charmap.exe"), @@ -119,48 +119,6 @@ pub struct Filter { } impl Filter { - pub fn create_file(&self, src: &Path, dest: &Path) -> Result<(), SftpError> { - let mut client_file = File::open(src)?; - // TODO: This will overwrite the file if it exists on the server - let mut server_file = self.sftp.create(dest)?; - - let mut buffer = [0; UPLOAD_CHUNK_SIZE_BYTES]; - let mut bytes_written = 0; - - // TODO: I could do the little offset trick and moving the old bytes to the - // beginning of the buffer, I just don't know if it's worth it - loop { - client_file.seek(SeekFrom::Start(bytes_written))?; - match client_file.read(&mut buffer) { - Ok(0) => break, - Ok(bytes_read) => { - bytes_written += server_file.write(&buffer[0..bytes_read])? as u64; - } - Err(err) if err.kind() == io::ErrorKind::Interrupted => {} - Err(err) => return Err(SftpError::Io(err)), - } - } - - Ok(()) - } - - // TODO: src is full, dest is relative - pub fn create_dir_all(&self, src: &Path, dest: &Path) -> Result<(), SftpError> { - // TODO: what does the "o" mean in 0o775 - self.sftp.mkdir(dest, 0o775)?; - - for entry in fs::read_dir(src)? { - let src = entry?.path(); - let dest = dest.join(src.file_name().unwrap()); - match src.is_dir() { - true => self.create_dir_all(&src, &dest)?, - false => self.create_file(&src, &dest)?, - } - } - - Ok(()) - } - pub fn remove_dir_all(&self, dest: &Path) -> Result<(), ssh2::Error> { for entry in self.sftp.readdir(dest)? { match entry.0.is_dir() { @@ -173,28 +131,27 @@ impl Filter { } } -// TODO: handle unwraps -// TODO: everything is just forwarded to external functions... This should be -// changed in the wrapper api impl SyncFilter for Filter { - // TODO: handle unwraps fn fetch_data(&self, request: Request, ticket: ticket::FetchData, info: info::FetchData) { - println!("fetch_data {:?}", request.file_blob()); - // TODO: handle unwrap let path = Path::new(unsafe { OsStr::from_encoded_bytes_unchecked(request.file_blob()) }); let range = info.required_file_range(); let end = range.end; let mut position = range.start; - // TODO: allow callback to return Result in SyncFilter + println!( + "fetch_data {:?} {:?} {}", + path, + range, + info.interrupted_hydration() + ); + let res = || -> Result<(), _> { let mut server_file = self .sftp .open(path) .map_err(|_| CloudErrorKind::InvalidRequest)?; let mut client_file = BufWriter::with_capacity(4096, request.placeholder()); - server_file .seek(SeekFrom::Start(position)) .map_err(|_| CloudErrorKind::InvalidRequest)?; @@ -208,12 +165,10 @@ impl SyncFilter for Filter { // into segments done on separate threads // transfer the data in chunks loop { - client_file.get_ref().set_progress(end, position).unwrap(); - // TODO: read directly to the BufWriters buffer // TODO: ignore if the error was just interrupted let bytes_read = server_file - .read(&mut buffer[0..DOWNLOAD_CHUNK_SIZE_BYTES]) + .read(&mut buffer) .map_err(|_| CloudErrorKind::InvalidRequest)?; let bytes_written = client_file .write(&buffer[0..bytes_read]) @@ -223,6 +178,8 @@ impl SyncFilter for Filter { if position >= end { break; } + + client_file.get_ref().set_progress(end, position).unwrap(); } client_file @@ -241,7 +198,6 @@ impl SyncFilter for Filter { println!("deleted"); } - // TODO: I probably also have to delete the file from the disk fn delete(&self, request: Request, ticket: ticket::Delete, info: info::Delete) { println!("delete {:?}", request.path()); let path = Path::new(unsafe { OsStr::from_encoded_bytes_unchecked(request.file_blob()) }); @@ -264,44 +220,32 @@ impl SyncFilter for Filter { } } - // TODO: Do I have to move the file and set the file progress? or does the OS - // handle that? (I think I do) fn rename(&self, request: Request, ticket: ticket::Rename, info: info::Rename) { let res = || -> Result<(), _> { - match info.target_in_scope() { - true => { - // TODO: path should auto include the drive letter - let src = request.path(); - // TODO: should be relative - let dest = info.target_path(); - - match info.source_in_scope() { - // TODO: use fs::copy or fs::rename, whatever it is to move the local files, - // then use ConvertToPlaceholder. I'm not sure if I have to do this recursively - // for each file or only the top-level folder TODO: which - // rename flags do I use? how do I know if I should be overwriting? - true => self - .sftp - .rename(&src, &dest, None) - .map_err(|_| CloudErrorKind::InvalidRequest)?, - false => match info.is_directory() { - true => self - .create_dir_all(&src, &dest) - .map_err(|_| CloudErrorKind::InvalidRequest)?, - false => self - .create_file(&src, &dest) - .map_err(|_| CloudErrorKind::InvalidRequest)?, - }, - } + let src = request.path(); + let dest = info.target_path(); + let base = get_client_path(); + + match (info.source_in_scope(), info.target_in_scope()) { + (true, true) => { + self.sftp + .rename( + &src.strip_prefix(&base).unwrap(), + &dest.strip_prefix(&base).unwrap(), + None, + ) + .map_err(|_| CloudErrorKind::InvalidRequest)?; } - // TODO: do I need to delete it locally? - false => self - .sftp - .unlink(Path::new(unsafe { - OsStr::from_encoded_bytes_unchecked(request.file_blob()) - })) - .map_err(|_| CloudErrorKind::InvalidRequest)?, + (true, false) => { + fs::copy(&src, dest).map_err(|_| CloudErrorKind::InvalidRequest)?; + self.sftp + .unlink(&src.strip_prefix(&base).unwrap()) + .map_err(|_| CloudErrorKind::InvalidRequest)?; + } + (false, true) => Err(CloudErrorKind::NotSupported)?, // TODO + (false, false) => Err(CloudErrorKind::InvalidRequest)?, } + ticket.pass().unwrap(); Ok(()) }(); @@ -327,7 +271,7 @@ impl SyncFilter for Filter { let parent = absolute.strip_prefix(&client_path).unwrap(); let dirs = self.sftp.readdir(parent).unwrap(); - let placeholders = dirs + let mut placeholders = dirs .into_iter() .filter(|(path, _)| !Path::new(&client_path).join(path).exists()) .map(|(path, stat)| { @@ -348,13 +292,13 @@ impl SyncFilter for Filter { .last_write_time(stat.mtime.unwrap_or_default()) .change_time(stat.mtime.unwrap_or_default()), ) + .mark_sync() .overwrite() - // .mark_sync() // need this? .blob(path.into_os_string().into_encoded_bytes()) }) .collect::>(); - ticket.pass_with_placeholder(placeholders).unwrap(); + ticket.pass_with_placeholder(&mut placeholders).unwrap(); } fn closed(&self, request: Request, info: info::Closed) { @@ -401,8 +345,6 @@ impl SyncFilter for Filter { fn renamed(&self, _request: Request, _info: info::Renamed) { println!("renamed"); } - - // TODO: acknowledgement callbacks } #[derive(Error, Debug)] diff --git a/src/filter/info.rs b/src/filter/info.rs index ffcc50a..c9f6740 100644 --- a/src/filter/info.rs +++ b/src/filter/info.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, ops::Range, path::PathBuf}; +use std::{ffi::OsString, fmt::Debug, ops::Range, path::PathBuf}; use widestring::U16CStr; use windows::Win32::Storage::CloudFilters::{ @@ -266,10 +266,10 @@ pub struct Deleted(pub(crate) CF_CALLBACK_PARAMETERS_0_4); /// Information for the [SyncFilter::rename][crate::SyncFilter::rename] callback. #[derive(Debug)] -pub struct Rename(pub(crate) CF_CALLBACK_PARAMETERS_0_10); +pub struct Rename(pub(crate) CF_CALLBACK_PARAMETERS_0_10, pub(crate) OsString); impl Rename { - /// Whether or not the placeholder being deleted is a directory. + /// Whether or not the placeholder being renamed is a directory. pub fn is_directory(&self) -> bool { (self.0.Flags & CloudFilters::CF_CALLBACK_RENAME_FLAG_IS_DIRECTORY).0 != 0 } @@ -286,11 +286,9 @@ impl Rename { /// The full path the placeholder is being moved to. pub fn target_path(&self) -> PathBuf { - unsafe { - U16CStr::from_ptr_str(self.0.TargetPath.0) - .to_os_string() - .into() - } + let mut path = PathBuf::from(&self.1); + path.push(unsafe { U16CStr::from_ptr_str(self.0.TargetPath.0) }.to_os_string()); + path } } diff --git a/src/filter/proxy.rs b/src/filter/proxy.rs index baf6a2f..49b19cb 100644 --- a/src/filter/proxy.rs +++ b/src/filter/proxy.rs @@ -232,8 +232,12 @@ pub unsafe extern "system" fn notify_rename( if let Some(filter) = filter_from_info::(info) { let request = Request::new(*info); let ticket = ticket::Rename::new(request.connection_key(), request.transfer_key()); + let info = info::Rename( + (*params).Anonymous.Rename, + request.volume_letter().to_os_string(), + ); - filter.rename(request, ticket, info::Rename((*params).Anonymous.Rename)); + filter.rename(request, ticket, info); } }