Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: s3_v2 connector cannot ingest new add files #17671

Closed
tabVersion opened this issue Jul 12, 2024 · 2 comments · Fixed by #17702
Closed

bug: s3_v2 connector cannot ingest new add files #17671

tabVersion opened this issue Jul 12, 2024 · 2 comments · Fixed by #17702
Assignees
Labels
type/bug Something isn't working
Milestone

Comments

@tabVersion
Copy link
Contributor

tabVersion commented Jul 12, 2024

Describe the bug

originally posted by a user from the community https://risingwave-community.slack.com/archives/C03BW71523T/p1720701467609019

@wcy-fdu and I have identified the root cause that list exec just list the files once and does not keep looping, which causes the bug.

let chunked_stream = stream.chunks(CHUNK_SIZE).map(|chunk| {
let rows = chunk
.into_iter()
.map(|item| match item {
Ok(page_item) => Ok((
Op::Insert,
OwnedRow::new(vec![
Some(ScalarImpl::Utf8(page_item.name.into_boxed_str())),
Some(ScalarImpl::Timestamptz(page_item.timestamp)),
Some(ScalarImpl::Int64(page_item.size)),
]),
)),
Err(e) => {
tracing::error!(error = %e.as_report(), "Connector fail to list item");
Err(e)
}
})
.collect::<Vec<_>>();
let res: Vec<(Op, OwnedRow)> = rows.into_iter().flatten().collect();
Ok(StreamChunk::from_rows(
&res,
&[DataType::Varchar, DataType::Timestamptz, DataType::Int64],
))
});

Error message/log

No response

To Reproduce

No response

Expected behavior

No response

How did you deploy RisingWave?

No response

The version of RisingWave

No response

Additional context

No response

@tabVersion tabVersion added the type/bug Something isn't working label Jul 12, 2024
@github-actions github-actions bot added this to the release-1.11 milestone Jul 12, 2024
@st1page
Copy link
Contributor

st1page commented Jul 12, 2024

Maybe there should be some test for this case 🤔

@marceloneppel
Copy link
Contributor

Using the RisingWave operator, is it safe to reschedule the compute node pod (by, for example, killing the current pod) to force the ingestion of the newly added files?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants