Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Use UDFs as sources #87

Closed
wants to merge 1 commit into from
Closed

Conversation

bakjos
Copy link

@bakjos bakjos commented Apr 19, 2024

No description provided.

@neverchanje
Copy link

neverchanje commented Apr 23, 2024

Thanks @bakjos

I am wondering what specific HTTP service do you intend to connect through the UDF source?

While looking into this requirement, I also came up with another idea:

If the requirement is to subscribe from poll-based API, such as Twitter streaming API, maybe we can introduce an external component for handling webhooks.

Here is a brief proposal:

RisingWave WebSync APIs

image

Instead of supporting arbitrary webhooks via SQL within RisingWave, this solution abstracts all integrations into a single, unified WebSync API.

For example, to subscribe from Github events of a repo, RisingWave will first poll WebSync Server, which in turn polls from Github. The pipeline operates in a stateless manner, without the needs for extra storage like Kafka.

This solution hides upstream details from RisingWave and allows users to customize their adapters.

Pros:

  • Highly extensible. One solution covers all scenarios.
  • Easy to implement. It's safer to use 3rd-party libraries in externel component than in RisingWave.

Cons:

  • Deployment complexity increases.
  • One additional network hop caused by the WebSync Server.

@bakjos
Copy link
Author

bakjos commented Apr 23, 2024

@neverchanje I have 2 use cases that can be supported through UDFs, one is to pull data using a GraphQL query and update it using a long pooling (which is not supported through a webhook), and the other is to support GraphQL subscriptions using SSE as the protocol. Both using OAuth with service account client credentials.

Although both could be supported through custom sources, the time and effort required to develop these sources, the different authentication mechanisms, and the different protocols will make it harder to implement. The UDF will give more flexibility to use custom libraries already developed by the end users, connecting to other data sources not currently supported by RisingWave, and the ability to use a language they are already comfortable with (Java, Javascript, Python, Rust).

I agree we should always prefer a custom data source implemented for RW, but, there are many to be implemented, to name a few: GraphQL (Hasura, WebSockets subscriptions, SSE subscriptions, Multipart subscriptions), HTTP long pooling, Webhooks, AMQP (RabbitMQ), cube.js, vector databases (Qdrant, pinecone) and there are always nuances on how to connect to the existing ones which can limit what sources we can connect.

@neverchanje
Copy link

neverchanje commented Apr 26, 2024

@bakjos sorry for my late response.

I generally align with your goal to provide enough flexibility for more sources. However, in terms of implementation, I believe that we should ensure the reusability of connector code. I am not 100% against embedding user code within the database, even though it could make the database difficult to maintain. But I think we should try to minimize the amount of user code unless absolutely necessary. For example, when it comes to Hasura Grafana SSE, I generally prefer offering a resuable, packaged connector instead letting users write everything from scratch.

--- edit

Personally, I like the idea of integrating Deno, aside from concerns about the significantly longer compilation time. But I'm not a fan of using the UDF framework in source.

@neverchanje
Copy link

Even we go toward this direction, I doubt if the UDF framework for stateless computation is suitable for ingestion as well. 🤔 Here are some points should be addressed in the framework:

  • Backfilling
  • State management (e.g, consumption progress)

BTW, I wonder why do you want to consume from Vector databases: Qdrant, pinecone?

@ly9chee
Copy link

ly9chee commented Apr 26, 2024

Hi @neverchanje, would RW consider providing a plugin mechanism for custom Source/Sink? In this way, users who have a special requirement for a particular Source/Sink can implement this plugin, and RW can load this plugin at runtime.
As long as the user adheres to the rules of plugin implementation, such as for sources, whether it can replay from a specific offset? Does it support parallel reading? (like Kafka has multiple partitions and each partition is independent) etc. With these prerequisites, RW may have confidence in interacting with the outside world, and also enhance the flexibility of data integration.

Flink users can achieve this by implementing a SourceFunction or SinkFunction, and placing their jar to the location that Flink can find them at runtime. IMO, there are more than thousands of database systems in the world. RW may not be able to implement all of it, but can implement the most popular data sources such as Mongodb, Mysql and Postgres, so that users can use them out-of-the-box. For those less common data sources users may implement it by themselves.

@neverchanje
Copy link

@ly9chee thanks for this advice. I agree with your opinion that we can introduce a plugin system to enable users to integrate with more sources. I am actually exploring in this direction as well.

Nevertheless, I doubt if using Javacsript for pluggable sources is a good idea. Our current options include Java (we use JNI for several sinks and sources like Iceberg) and Rust. We'll probably not use other languages, at least in the short term.

@ly9chee
Copy link

ly9chee commented Apr 26, 2024

@neverchanje thanks for your reply! IMO, it's all about the Source/Sink plugin interface that the RW will offer to the user, and the language used to implement the plugin is less critical (even rust is acceptable). For now, it is impossible to ingest data from or sink data to an external data source unsupported by RW, unless there is an intermediate component (such as Kafka) being used to transfer the data. This requires some extra efforts before RW can process the data, also involves thirdparty components, which increases the administrative work.

@neverchanje
Copy link

😄 Even Rust is acceptable. I exactly wrote the same thing in my design proposal.

But please note that CDC is more complex. Our current CDC framework involves both Java and Rust, as we utilize Debezium under the hood, and we also implement our own backfilling logic and shared-source.

I plan to initially prioritize the Source, particularly focusing on HTTP APIs like Github, Salesforce, Jira, etc.

@bakjos
Copy link
Author

bakjos commented Apr 26, 2024

@neverchanje UDFs are not restricted to javascript I am only using it as an example because it supports async and external calls through the fetch API, but the same behavior could be implemented in any of the other languages. Things like state or backfilling could be implemented by exposing some objects/interfaces to the UDF, in deno can be easily implemented.

The plugins are a bit harder to implement in Rust without using FFI, or WASM which the UDFs already support. one of the advantages of using UDFs is that can be easily done at runtime without having to compile/rebuild/redeploy the entire project.

Vector databases are probably bad examples for sources, since will be more suitable for sinks, but any other database with a javascript/java/rust API could be used as a source through the UDFs.

@neverchanje
Copy link

Things like state or backfilling could be implemented by exposing some objects/interfaces to the UDF

Stateful UDF is not in our plan actually. cc @wangrunji0408 Because it can be trickly to limit the use of state stores while exposing the interface. We might consider opening the gate for UDF for sources, but not for UDF in stream processing.

@wangrunji0408
Copy link

Things like state or backfilling could be implemented by exposing some objects/interfaces to the UDF

Stateful UDF is not in our plan actually. cc @wangrunji0408 Because it can be trickly to limit the use of state stores while exposing the interface. We might consider opening the gate for UDF for sources, but not for UDF in stream processing.

The UDF itself should be stateless. But we can pass the state as a parameter to the function. Further, we can even expose host functions for UDF to call. This is feasible in both WASM and deno. So I think there's no problem in providing such flexibility while keeping the UDF stateless.

@stdrc
Copy link
Member

stdrc commented Apr 29, 2024

RisingWave WebSync APIs

image

Instead of supporting arbitrary webhooks via SQL within RisingWave, this solution abstracts all integrations into a single, unified WebSync API.

For example, to subscribe from Github events of a repo, RisingWave will first poll WebSync Server, which in turn polls from Github. The pipeline operates in a stateless manner, without the needs for extra storage like Kafka.

This solution hides upstream details from RisingWave and allows users to customize their adapters.

Pros:

  • Highly extensible. One solution covers all scenarios.
  • Easy to implement. It's safer to use 3rd-party libraries in externel component than in RisingWave.

Cons:

  • Deployment complexity increases.
  • One additional network hop caused by the WebSync Server.

@neverchanje Seems to be a valid design for risingwavelabs/risingwave#16025

@tabVersion tabVersion requested a review from xiangjinwu May 2, 2024 06:27
@fuyufjh
Copy link
Member

fuyufjh commented May 14, 2024

Note down some thoughts from off-line discussion.

There are generally 2 ways to allow users to ingest from customized sources

  • Webhook: RisingWave listens on a particular HTTP endpoint and data can be pushed into it.
  • User-defined source: RisingWave runs a UDF or something similar to pull data.

Both approaches have their fit cases, depending on whether the external system prefers to push or pull data. For example, GitHub naturally publish changes via Webhook, while a 3rd-party database or messaging system would most likely provide a pull-based client.

I just took a look on MZ's webhook support. The authorization is a big challenge. I would rather offload this to a cloud-based stateless service to convert an HTTP request to an INSERT into RisingWave.

Regarding of user-defined source/sink, I agree with this RFC that reusing UDFs seem to be a good idea. Hopefully this can achieve the goal with minimal work and complexity.

@fuyufjh
Copy link
Member

fuyufjh commented Sep 6, 2024

Update:

We decided to work on both Webhook Source and HTTP POST API to enable similar usage.

  • Webhook Source allows users to define a Webhook source in RisingWave frontend. Other applications can call this API to ingest data by the format defined in the source.
  • HTTP POST API works in cloud only. It accepts HTTP POST requests and translate it into equivalent INSERTs into RisingWave table.

We believe the use cases mentioned in this RFC can be more or less covered by these two approaches.

Thanks a lot for the proposal anyway! @bakjos

Also cc. @neverchanje @cloudcarver @KeXiangWang

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants