Skip to content

Commit

Permalink
minor
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Oct 30, 2023
1 parent eef23a8 commit 3485c04
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 7 deletions.
8 changes: 3 additions & 5 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;

use futures::future::try_join_all;
use futures::stream::pending;
use futures::{pin_mut, FutureExt, Stream, StreamExt};
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::catalog::ColumnId;
Expand All @@ -31,7 +31,7 @@ use risingwave_connector::source::filesystem::{FsPage, FsPageItem, S3SplitEnumer
use risingwave_connector::source::{
create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties,
ConnectorState, FsFilterCtrlCtx, FsListInner, SourceColumnDesc, SourceContext,
SourceEnumeratorContext, SplitEnumerator, SplitReader, StreamChunkWithState,
SourceEnumeratorContext, SplitEnumerator, SplitReader,
};
use tokio::time;
use tokio::time::{Duration, MissedTickBehavior};
Expand Down Expand Up @@ -177,9 +177,7 @@ impl ConnectorSource {
.await?
};

let mut stream = select_all(readers.into_iter().map(|r| r.into_stream()))
.boxed()
.peekable();
let mut stream = select_all(readers.into_iter().map(|r| r.into_stream())).peekable();
// peek the stream to make sure the underlying connector has been inited
let _ = Pin::new(&mut stream).peek().await;
Ok(stream.boxed())
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
// limitations under the License.

use std::fmt::Formatter;
use std::pin::Pin;
use std::time::Duration;

use anyhow::anyhow;
use either::Either;
use futures::{pin_mut, StreamExt};
use futures::StreamExt;
use futures_async_stream::try_stream;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_connector::source::{
Expand Down

0 comments on commit 3485c04

Please sign in to comment.