From 553a297037157e606c83ee55aa6ed5de3d884a42 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 25 Mar 2024 20:46:54 +0800 Subject: [PATCH] Fix sftp support Signed-off-by: Xuanwo --- core/src/services/sftp/backend.rs | 48 +++++++------------ core/src/services/sftp/mod.rs | 1 + core/src/services/sftp/reader.rs | 80 +++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 31 deletions(-) create mode 100644 core/src/services/sftp/reader.rs diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 936382dd9be7..366e18d4efaa 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -22,7 +22,6 @@ use std::path::Path; use std::path::PathBuf; use async_trait::async_trait; -use bytes::Bytes; use futures::StreamExt; use log::debug; use openssh::KnownHosts; @@ -36,6 +35,7 @@ use super::error::is_sftp_protocol_error; use super::error::parse_sftp_error; use super::error::parse_ssh_error; use super::lister::SftpLister; +use super::reader::SftpReader; use super::writer::SftpWriter; use crate::raw::*; use crate::*; @@ -210,7 +210,6 @@ impl Builder for SftpBuilder { key: self.config.key.clone(), known_hosts_strategy, copyable: self.config.enable_copy, - client: tokio::sync::OnceCell::new(), }) } @@ -223,6 +222,7 @@ impl Builder for SftpBuilder { } /// Backend is used to serve `Accessor` support for sftp. +#[derive(Clone)] pub struct SftpBackend { endpoint: String, root: String, @@ -230,7 +230,6 @@ pub struct SftpBackend { key: Option, known_hosts_strategy: KnownHosts, copyable: bool, - client: tokio::sync::OnceCell, } impl Debug for SftpBackend { @@ -241,7 +240,7 @@ impl Debug for SftpBackend { #[async_trait] impl Accessor for SftpBackend { - type Reader = Bytes; + type Reader = SftpReader; type Writer = SftpWriter; type Lister = Option; type BlockingReader = (); @@ -309,18 +308,10 @@ impl Accessor for SftpBackend { } async fn read(&self, path: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { - let client = self.connect().await?; - - let mut fs = client.fs(); - fs.set_cwd(&self.root); - let path = fs.canonicalize(path).await.map_err(parse_sftp_error)?; - - let _f = client - .open(path.as_path()) - .await - .map_err(parse_sftp_error)?; - - todo!() + Ok(( + RpRead::default(), + SftpReader::new(self.clone(), self.root.clone(), path.to_owned()), + )) } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -467,21 +458,16 @@ impl Accessor for SftpBackend { } impl SftpBackend { - async fn connect(&self) -> Result<&Sftp> { - let sftp = self - .client - .get_or_try_init(|| { - Box::pin(connect_sftp( - self.endpoint.as_str(), - self.root.clone(), - self.user.clone(), - self.key.clone(), - self.known_hosts_strategy.clone(), - )) - }) - .await?; - - Ok(sftp) + /// TODO: implement connection pool in the future. + pub async fn connect(&self) -> Result { + connect_sftp( + self.endpoint.as_str(), + self.root.clone(), + self.user.clone(), + self.key.clone(), + self.known_hosts_strategy.clone(), + ) + .await } } diff --git a/core/src/services/sftp/mod.rs b/core/src/services/sftp/mod.rs index fcc74afc2951..001898171b0f 100644 --- a/core/src/services/sftp/mod.rs +++ b/core/src/services/sftp/mod.rs @@ -21,5 +21,6 @@ pub use backend::SftpConfig; mod backend; mod error; mod lister; +mod reader; mod utils; mod writer; diff --git a/core/src/services/sftp/reader.rs b/core/src/services/sftp/reader.rs new file mode 100644 index 000000000000..6f421023fe2d --- /dev/null +++ b/core/src/services/sftp/reader.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::backend::SftpBackend; +use super::error::parse_sftp_error; +use crate::raw::*; +use crate::*; +use bytes::BytesMut; +use std::io::SeekFrom; +use tokio::io::AsyncSeekExt; + +pub struct SftpReader { + inner: SftpBackend, + root: String, + path: String, +} + +impl SftpReader { + pub fn new(inner: SftpBackend, root: String, path: String) -> Self { + Self { inner, root, path } + } +} + +impl oio::Read for SftpReader { + async fn read_at(&self, offset: u64, limit: usize) -> Result { + let client = self.inner.connect().await?; + + let mut fs = client.fs(); + fs.set_cwd(&self.root); + + let path = fs + .canonicalize(&self.path) + .await + .map_err(parse_sftp_error)?; + + let mut f = client + .open(path.as_path()) + .await + .map_err(parse_sftp_error)?; + + f.seek(SeekFrom::Start(offset)) + .await + .map_err(new_std_io_error)?; + + let mut size = limit; + if size == 0 { + return Ok(oio::Buffer::new()); + } + + let mut buf = BytesMut::with_capacity(limit); + while size > 0 { + let len = buf.len(); + if let Some(bytes) = f + .read(size as u32, buf.split_off(len)) + .await + .map_err(parse_sftp_error)? + { + size -= bytes.len(); + buf.unsplit(bytes); + } else { + break; + } + } + Ok(oio::Buffer::from(buf.freeze())) + } +}