Skip to content

Commit

Permalink
fix: rename and fetch_data progress report
Browse files Browse the repository at this point in the history
  • Loading branch information
ho-229 committed Jun 11, 2024
1 parent 0599278 commit a47141d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 105 deletions.
134 changes: 38 additions & 96 deletions examples/sftp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use wincs::{
filter::{info, ticket, SyncFilter},
placeholder_file::{Metadata, PlaceholderFile},
request::Request,
CloudErrorKind, PopulationType, Registration, SecurityId, SyncRootIdBuilder,
CloudErrorKind, HydrationType, PopulationType, Registration, SecurityId, SyncRootIdBuilder,
};

// max should be 65536, this is done both in term-scp and sshfs because it's the
// max packet size for a tcp connection
const DOWNLOAD_CHUNK_SIZE_BYTES: usize = 4096;
// doesn't have to be 4KiB aligned
const UPLOAD_CHUNK_SIZE_BYTES: usize = 4096;
// const UPLOAD_CHUNK_SIZE_BYTES: usize = 4096;

const PROVIDER_NAME: &str = "wincs";
const DISPLAY_NAME: &str = "Sftp";
Expand Down Expand Up @@ -59,7 +59,7 @@ fn main() {
let u16_display_name = U16String::from_str(DISPLAY_NAME);
Registration::from_sync_root_id(&sync_root_id)
.display_name(&u16_display_name)
.hydration_type(wincs::HydrationType::Full)
.hydration_type(HydrationType::Full)
.population_type(PopulationType::Full)
.icon(
U16String::from_str("%SystemRoot%\\system32\\charmap.exe"),
Expand Down Expand Up @@ -119,48 +119,6 @@ pub struct Filter {
}

impl Filter {
pub fn create_file(&self, src: &Path, dest: &Path) -> Result<(), SftpError> {
let mut client_file = File::open(src)?;
// TODO: This will overwrite the file if it exists on the server
let mut server_file = self.sftp.create(dest)?;

let mut buffer = [0; UPLOAD_CHUNK_SIZE_BYTES];
let mut bytes_written = 0;

// TODO: I could do the little offset trick and moving the old bytes to the
// beginning of the buffer, I just don't know if it's worth it
loop {
client_file.seek(SeekFrom::Start(bytes_written))?;
match client_file.read(&mut buffer) {
Ok(0) => break,
Ok(bytes_read) => {
bytes_written += server_file.write(&buffer[0..bytes_read])? as u64;
}
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
Err(err) => return Err(SftpError::Io(err)),
}
}

Ok(())
}

// TODO: src is full, dest is relative
pub fn create_dir_all(&self, src: &Path, dest: &Path) -> Result<(), SftpError> {
// TODO: what does the "o" mean in 0o775
self.sftp.mkdir(dest, 0o775)?;

for entry in fs::read_dir(src)? {
let src = entry?.path();
let dest = dest.join(src.file_name().unwrap());
match src.is_dir() {
true => self.create_dir_all(&src, &dest)?,
false => self.create_file(&src, &dest)?,
}
}

Ok(())
}

pub fn remove_dir_all(&self, dest: &Path) -> Result<(), ssh2::Error> {
for entry in self.sftp.readdir(dest)? {
match entry.0.is_dir() {
Expand All @@ -173,28 +131,27 @@ impl Filter {
}
}

// TODO: handle unwraps
// TODO: everything is just forwarded to external functions... This should be
// changed in the wrapper api
impl SyncFilter for Filter {
// TODO: handle unwraps
fn fetch_data(&self, request: Request, ticket: ticket::FetchData, info: info::FetchData) {
println!("fetch_data {:?}", request.file_blob());
// TODO: handle unwrap
let path = Path::new(unsafe { OsStr::from_encoded_bytes_unchecked(request.file_blob()) });

let range = info.required_file_range();
let end = range.end;
let mut position = range.start;

// TODO: allow callback to return Result in SyncFilter
println!(
"fetch_data {:?} {:?} {}",
path,
range,
info.interrupted_hydration()
);

let res = || -> Result<(), _> {
let mut server_file = self
.sftp
.open(path)
.map_err(|_| CloudErrorKind::InvalidRequest)?;
let mut client_file = BufWriter::with_capacity(4096, request.placeholder());

server_file
.seek(SeekFrom::Start(position))
.map_err(|_| CloudErrorKind::InvalidRequest)?;
Expand All @@ -208,12 +165,10 @@ impl SyncFilter for Filter {
// into segments done on separate threads
// transfer the data in chunks
loop {
client_file.get_ref().set_progress(end, position).unwrap();

// TODO: read directly to the BufWriters buffer
// TODO: ignore if the error was just interrupted
let bytes_read = server_file
.read(&mut buffer[0..DOWNLOAD_CHUNK_SIZE_BYTES])
.read(&mut buffer)
.map_err(|_| CloudErrorKind::InvalidRequest)?;
let bytes_written = client_file
.write(&buffer[0..bytes_read])
Expand All @@ -223,6 +178,8 @@ impl SyncFilter for Filter {
if position >= end {
break;
}

client_file.get_ref().set_progress(end, position).unwrap();
}

client_file
Expand All @@ -241,7 +198,6 @@ impl SyncFilter for Filter {
println!("deleted");
}

// TODO: I probably also have to delete the file from the disk
fn delete(&self, request: Request, ticket: ticket::Delete, info: info::Delete) {
println!("delete {:?}", request.path());
let path = Path::new(unsafe { OsStr::from_encoded_bytes_unchecked(request.file_blob()) });
Expand All @@ -264,44 +220,32 @@ impl SyncFilter for Filter {
}
}

// TODO: Do I have to move the file and set the file progress? or does the OS
// handle that? (I think I do)
fn rename(&self, request: Request, ticket: ticket::Rename, info: info::Rename) {
let res = || -> Result<(), _> {
match info.target_in_scope() {
true => {
// TODO: path should auto include the drive letter
let src = request.path();
// TODO: should be relative
let dest = info.target_path();

match info.source_in_scope() {
// TODO: use fs::copy or fs::rename, whatever it is to move the local files,
// then use ConvertToPlaceholder. I'm not sure if I have to do this recursively
// for each file or only the top-level folder TODO: which
// rename flags do I use? how do I know if I should be overwriting?
true => self
.sftp
.rename(&src, &dest, None)
.map_err(|_| CloudErrorKind::InvalidRequest)?,
false => match info.is_directory() {
true => self
.create_dir_all(&src, &dest)
.map_err(|_| CloudErrorKind::InvalidRequest)?,
false => self
.create_file(&src, &dest)
.map_err(|_| CloudErrorKind::InvalidRequest)?,
},
}
let src = request.path();
let dest = info.target_path();
let base = get_client_path();

match (info.source_in_scope(), info.target_in_scope()) {
(true, true) => {
self.sftp
.rename(
&src.strip_prefix(&base).unwrap(),
&dest.strip_prefix(&base).unwrap(),
None,
)
.map_err(|_| CloudErrorKind::InvalidRequest)?;
}
// TODO: do I need to delete it locally?
false => self
.sftp
.unlink(Path::new(unsafe {
OsStr::from_encoded_bytes_unchecked(request.file_blob())
}))
.map_err(|_| CloudErrorKind::InvalidRequest)?,
(true, false) => {
fs::copy(&src, dest).map_err(|_| CloudErrorKind::InvalidRequest)?;
self.sftp
.unlink(&src.strip_prefix(&base).unwrap())
.map_err(|_| CloudErrorKind::InvalidRequest)?;
}
(false, true) => Err(CloudErrorKind::NotSupported)?, // TODO
(false, false) => Err(CloudErrorKind::InvalidRequest)?,
}

ticket.pass().unwrap();
Ok(())
}();
Expand All @@ -327,7 +271,7 @@ impl SyncFilter for Filter {
let parent = absolute.strip_prefix(&client_path).unwrap();

let dirs = self.sftp.readdir(parent).unwrap();
let placeholders = dirs
let mut placeholders = dirs
.into_iter()
.filter(|(path, _)| !Path::new(&client_path).join(path).exists())
.map(|(path, stat)| {
Expand All @@ -348,13 +292,13 @@ impl SyncFilter for Filter {
.last_write_time(stat.mtime.unwrap_or_default())
.change_time(stat.mtime.unwrap_or_default()),
)
.mark_sync()
.overwrite()
// .mark_sync() // need this?
.blob(path.into_os_string().into_encoded_bytes())
})
.collect::<Vec<_>>();

ticket.pass_with_placeholder(placeholders).unwrap();
ticket.pass_with_placeholder(&mut placeholders).unwrap();
}

fn closed(&self, request: Request, info: info::Closed) {
Expand Down Expand Up @@ -401,8 +345,6 @@ impl SyncFilter for Filter {
fn renamed(&self, _request: Request, _info: info::Renamed) {
println!("renamed");
}

// TODO: acknowledgement callbacks
}

#[derive(Error, Debug)]
Expand Down
14 changes: 6 additions & 8 deletions src/filter/info.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Debug, ops::Range, path::PathBuf};
use std::{ffi::OsString, fmt::Debug, ops::Range, path::PathBuf};

use widestring::U16CStr;
use windows::Win32::Storage::CloudFilters::{
Expand Down Expand Up @@ -266,10 +266,10 @@ pub struct Deleted(pub(crate) CF_CALLBACK_PARAMETERS_0_4);

/// Information for the [SyncFilter::rename][crate::SyncFilter::rename] callback.
#[derive(Debug)]
pub struct Rename(pub(crate) CF_CALLBACK_PARAMETERS_0_10);
pub struct Rename(pub(crate) CF_CALLBACK_PARAMETERS_0_10, pub(crate) OsString);

impl Rename {
/// Whether or not the placeholder being deleted is a directory.
/// Whether or not the placeholder being renamed is a directory.
pub fn is_directory(&self) -> bool {
(self.0.Flags & CloudFilters::CF_CALLBACK_RENAME_FLAG_IS_DIRECTORY).0 != 0
}
Expand All @@ -286,11 +286,9 @@ impl Rename {

/// The full path the placeholder is being moved to.
pub fn target_path(&self) -> PathBuf {
unsafe {
U16CStr::from_ptr_str(self.0.TargetPath.0)
.to_os_string()
.into()
}
let mut path = PathBuf::from(&self.1);
path.push(unsafe { U16CStr::from_ptr_str(self.0.TargetPath.0) }.to_os_string());
path
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/filter/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,12 @@ pub unsafe extern "system" fn notify_rename<T: SyncFilter + 'static>(
if let Some(filter) = filter_from_info::<T>(info) {
let request = Request::new(*info);
let ticket = ticket::Rename::new(request.connection_key(), request.transfer_key());
let info = info::Rename(
(*params).Anonymous.Rename,
request.volume_letter().to_os_string(),
);

filter.rename(request, ticket, info::Rename((*params).Anonymous.Rename));
filter.rename(request, ticket, info);
}
}

Expand Down

0 comments on commit a47141d

Please sign in to comment.