From e383ad60cb3a07c0008b53921816271fd64979a1 Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:29:29 +0800 Subject: [PATCH] fix(connector): file source do not panic when credential is wrong (#17935) --- .../filesystem/opendal_source/opendal_enumerator.rs | 13 +++++++++++-- src/stream/src/executor/source/list_executor.rs | 4 ++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index 864d1de56c7b..cffeb5dfe5f6 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -14,6 +14,7 @@ use std::marker::PhantomData; +use anyhow::anyhow; use async_trait::async_trait; use chrono::{DateTime, Utc}; use futures::stream::{self, BoxStream}; @@ -51,14 +52,22 @@ impl SplitEnumerator for OpendalEnumerator { async fn list_splits(&mut self) -> ConnectorResult>> { let empty_split: OpendalFsSplit = OpendalFsSplit::empty_split(); + let prefix = self.prefix.as_deref().unwrap_or("/"); - Ok(vec![empty_split]) + match self.op.list(prefix).await { + Ok(_) => return Ok(vec![empty_split]), + Err(e) => { + return Err(anyhow!(e) + .context("fail to create source, please check your config.") + .into()) + } + } } } impl OpendalEnumerator { pub async fn list(&self) -> ConnectorResult { - let prefix = self.prefix.as_deref().unwrap_or(""); + let prefix = self.prefix.as_deref().unwrap_or("/"); let object_lister = self .op diff --git a/src/stream/src/executor/source/list_executor.rs b/src/stream/src/executor/source/list_executor.rs index 25b32c0a0e4b..c11ba773648b 100644 --- a/src/stream/src/executor/source/list_executor.rs +++ b/src/stream/src/executor/source/list_executor.rs @@ -99,6 +99,10 @@ impl FsListExecutor { .collect::>(); let res: Vec<(Op, OwnedRow)> = rows.into_iter().flatten().collect(); + if res.is_empty() { + tracing::warn!("No items were listed from source."); + return Ok(StreamChunk::default()); + } Ok(StreamChunk::from_rows( &res, &[DataType::Varchar, DataType::Timestamptz, DataType::Int64],