Skip to content

Commit

Permalink
Fix sftp support
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Mar 25, 2024
1 parent 9bde47d commit 553a297
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 31 deletions.
48 changes: 17 additions & 31 deletions core/src/services/sftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::path::Path;
use std::path::PathBuf;

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use log::debug;
use openssh::KnownHosts;
Expand All @@ -36,6 +35,7 @@ use super::error::is_sftp_protocol_error;
use super::error::parse_sftp_error;
use super::error::parse_ssh_error;
use super::lister::SftpLister;
use super::reader::SftpReader;
use super::writer::SftpWriter;
use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -210,7 +210,6 @@ impl Builder for SftpBuilder {
key: self.config.key.clone(),
known_hosts_strategy,
copyable: self.config.enable_copy,
client: tokio::sync::OnceCell::new(),
})
}

Expand All @@ -223,14 +222,14 @@ impl Builder for SftpBuilder {
}

/// Backend is used to serve `Accessor` support for sftp.
#[derive(Clone)]
pub struct SftpBackend {
endpoint: String,
root: String,
user: Option<String>,
key: Option<String>,
known_hosts_strategy: KnownHosts,
copyable: bool,
client: tokio::sync::OnceCell<Sftp>,
}

impl Debug for SftpBackend {
Expand All @@ -241,7 +240,7 @@ impl Debug for SftpBackend {

#[async_trait]
impl Accessor for SftpBackend {
type Reader = Bytes;
type Reader = SftpReader;
type Writer = SftpWriter;
type Lister = Option<SftpLister>;
type BlockingReader = ();
Expand Down Expand Up @@ -309,18 +308,10 @@ impl Accessor for SftpBackend {
}

async fn read(&self, path: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
let client = self.connect().await?;

let mut fs = client.fs();
fs.set_cwd(&self.root);
let path = fs.canonicalize(path).await.map_err(parse_sftp_error)?;

let _f = client
.open(path.as_path())
.await
.map_err(parse_sftp_error)?;

todo!()
Ok((
RpRead::default(),
SftpReader::new(self.clone(), self.root.clone(), path.to_owned()),
))
}

async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Expand Down Expand Up @@ -467,21 +458,16 @@ impl Accessor for SftpBackend {
}

impl SftpBackend {
async fn connect(&self) -> Result<&Sftp> {
let sftp = self
.client
.get_or_try_init(|| {
Box::pin(connect_sftp(
self.endpoint.as_str(),
self.root.clone(),
self.user.clone(),
self.key.clone(),
self.known_hosts_strategy.clone(),
))
})
.await?;

Ok(sftp)
/// TODO: implement connection pool in the future.
pub async fn connect(&self) -> Result<Sftp> {
connect_sftp(
self.endpoint.as_str(),
self.root.clone(),
self.user.clone(),
self.key.clone(),
self.known_hosts_strategy.clone(),
)
.await
}
}

Expand Down
1 change: 1 addition & 0 deletions core/src/services/sftp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ pub use backend::SftpConfig;
mod backend;
mod error;
mod lister;
mod reader;
mod utils;
mod writer;
80 changes: 80 additions & 0 deletions core/src/services/sftp/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use super::backend::SftpBackend;
use super::error::parse_sftp_error;
use crate::raw::*;
use crate::*;
use bytes::BytesMut;
use std::io::SeekFrom;
use tokio::io::AsyncSeekExt;

pub struct SftpReader {
inner: SftpBackend,
root: String,
path: String,
}

impl SftpReader {
pub fn new(inner: SftpBackend, root: String, path: String) -> Self {
Self { inner, root, path }
}
}

impl oio::Read for SftpReader {
async fn read_at(&self, offset: u64, limit: usize) -> Result<oio::Buffer> {
let client = self.inner.connect().await?;

let mut fs = client.fs();
fs.set_cwd(&self.root);

let path = fs
.canonicalize(&self.path)
.await
.map_err(parse_sftp_error)?;

let mut f = client
.open(path.as_path())
.await
.map_err(parse_sftp_error)?;

f.seek(SeekFrom::Start(offset))
.await
.map_err(new_std_io_error)?;

let mut size = limit;
if size == 0 {
return Ok(oio::Buffer::new());
}

let mut buf = BytesMut::with_capacity(limit);
while size > 0 {
let len = buf.len();
if let Some(bytes) = f
.read(size as u32, buf.split_off(len))
.await
.map_err(parse_sftp_error)?
{
size -= bytes.len();
buf.unsplit(bytes);
} else {
break;
}
}
Ok(oio::Buffer::from(buf.freeze()))
}
}

0 comments on commit 553a297

Please sign in to comment.