Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use UDFs as sources #16223

Closed
bakjos opened this issue Apr 9, 2024 · 2 comments
Closed

Use UDFs as sources #16223

bakjos opened this issue Apr 9, 2024 · 2 comments

Comments

@bakjos
Copy link
Contributor

bakjos commented Apr 9, 2024

Background

With the amount of sources and how different is consumed by the clients, using UDFs could help to support more sources:

  • Http long pooling (Internal APIs)
  • WebSockets/SSE (GraphQL/Rest)
  • Grpc Endpoints

Design

The UDFs are already supporting tables, and use yield to generate the values through streaming futures

async fn eval_inner<'a>(&'a self, input: &'a DataChunk) {
// evaluate children expressions
let mut columns = Vec::with_capacity(self.children.len());
for c in &self.children {
let val = c.eval(input).await?;
columns.push(val);
}
let direct_input = DataChunk::new(columns, input.visibility().clone());
// compact the input chunk and record the row mapping
let visible_rows = direct_input.visibility().iter_ones().collect_vec();
let compacted_input = direct_input.compact_cow();
let arrow_input = RecordBatch::try_from(compacted_input.as_ref())?;
// call UDTF
#[for_await]
for res in self
.client
.call_table_function(&self.identifier, arrow_input)
{
let output = DataChunk::try_from(&res?)?;
self.check_output(&output)?;
// we send the compacted input to UDF, so we need to map the row indices back to the
// original input
let origin_indices = output
.column_at(0)
.as_int32()
.raw_iter()
// we have checked all indices are non-negative
.map(|idx| visible_rows[idx as usize] as i32)
.collect::<I32Array>();
let output = DataChunk::new(
vec![origin_indices.into_ref(), output.column_at(1).clone()],
output.visibility().clone(),
);
yield output;
}
}
.

This functionality could be translated to tables/sources to emit records when a new value is received from the UDF, using a sintax like CREATE [TABLE|SOURCE ] .. AS <function_name> (param1, param2)

Future Optimizations

  • Add support for async wasm UDFs
  • Define a way to provide splits
  • Add support for local state (to keep things as the auth tokens)

Discussions

No response

Q&A

No response

Copy link
Contributor

github-actions bot commented Aug 1, 2024

This issue has been open for 60 days with no activity.

If you think it is still relevant today, and needs to be done in the near future, you can comment to update the status, or just manually remove the no-issue-activity label.

You can also confidently close this issue as not planned to keep our backlog clean.
Don't worry if you think the issue is still valuable to continue in the future.
It's searchable and can be reopened when it's time. 😄

@fuyufjh fuyufjh removed this from the release-1.9 milestone Oct 8, 2024
@fuyufjh fuyufjh assigned stdrc and unassigned wangrunji0408 and xxchan Oct 8, 2024
@stdrc
Copy link
Member

stdrc commented Oct 11, 2024

@stdrc stdrc closed this as not planned Won't fix, can't repro, duplicate, stale Oct 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants