Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid calling UDF when the row being updated to is the same as existing row #12201

Open
Tracked by #7405
lmatz opened this issue Sep 11, 2023 · 16 comments
Open
Tracked by #7405

Comments

@lmatz
Copy link
Contributor

lmatz commented Sep 11, 2023

Users request that their UDF is particularly expensive, e.g. in terms of computation overhead, or they are calling other services in UDF and it charges by the number of calls.

It is an MV in the end, but also wonder if the solution changes when it is a sink in the end?

Prefer an idea of a viable solution. The time of implementation can be discussed later,

@github-actions github-actions bot added this to the release-1.3 milestone Sep 11, 2023
@lmatz
Copy link
Contributor Author

lmatz commented Sep 11, 2023

Do we have the assumption that updateinsert and updatedelete are consecutive in the stream chunk? if yes, can we compact to resolve it?

Does it have anything to do with the other expressions in the same project that UDF is placed in?

@wangrunji0408
Copy link
Contributor

Calling other services in UDF has significant impact on performance. @jon-chuang once added a feature to parallelize these blocking function calls using thread pool. This can be enabled by an io_threads parameter in the @udf annotation:

@udf(input_types=["INT"], result_type="INT", io_threads=32)
def wait_concurrent(x):
    time.sleep(0.01)
    return 0

This could help reduce the latency introduced by external calls.

As for the number of calls, if necessary, we can provide a batch evaluation API. This way, users will receive a batch of input at a time. Then they might be able to call external services in batch style.

@lmatz
Copy link
Contributor Author

lmatz commented Sep 11, 2023

Thanks, I see. The solution can work for calling external services.

For this particular user, computation overhead is one of the primary reasons. I suppose if this is the case, we have no other way to avoid calling UDF when the data is the same?

Wonder if the compaction of stream chunks before calling UDF is enough to solve that, or it turns to be a quite difficult problem?

@wangrunji0408
Copy link
Contributor

wangrunji0408 commented Sep 11, 2023

Oh, you reminded me, we really don't compact the chunk before sending to UDF. I mean, if there is a chunk with capacity 1024 but cardinality 1, it will send all 1024 rows instead of the one valid row.

I'm not sure whether most of the input chunks are high cardinality or low cardinality in real world. But it's worth fixing anyway. I'll create an issue and fix it soon.

@fuyufjh
Copy link
Member

fuyufjh commented Sep 12, 2023

related #11070

@st1page
Copy link
Contributor

st1page commented Sep 12, 2023

We have three levels of optimization to reduce the rows processed by the UDF

  1. Compact the chunk
  2. Convert the retractable stream to an upsert stream
    • If the UDFproject's downstream is a materialize executor, when user ensures that their UDF cost is higher than the lookup old value for each row in materialize, we can convert the stream to upsert stream, so that the UDF will only processed on "updateInsert" and ignore the "UpdateDelete"
    • If the downstream is an upsert sink, there is no costs at all.
  3. Add a result cache for the UDF.

@lmatz
Copy link
Contributor Author

lmatz commented Sep 12, 2023

If the downstream is an upsert sink, there is no costs at all.

As long as a UDF or even multiple UDFs are used in a create sink streaming job, the UDF will not process either UpdateInsert or UpdateDelete if these two rows contain the same data?

when user ensures that their UDF cost is higher than the lookup old value for each row in materialize

Is it a knob that users need to turn on/off when create materialized view?

Add a result cache for the UDF.

Is it completely managed inside users' UDF code?

I guess I just failed to understand why

Alternatively, we can partially achieve this while preserving event orders - the solution is trivial: just hide the update with same before & after values.

For example, set vis = false for these:

U- 1 2 'foo'
U+ 1 2 'foo'
but not these:

- 1 2 'foo'
... optionally, some other rows ...
+ 1 2 'foo'

is not taken due to what limitation.
My understanding is that if this approach is taken, then the original intention of this issue can be achieved, is it right?

@fuyufjh
Copy link
Member

fuyufjh commented Sep 13, 2023

After some consideration, I prefer to treat this as a brand-new issue. Sorry for bringing up #11070...

The cost of UDF call is much much higher than built-in functions. I think we can do more specific optimizations here:

  1. Compact the chunk: Do not process the rows with false visibility. However, feat(stream): compact stream chunk by dropping useless rows in sink #11070 will focus on Sink only and that approach should not be used during streaming, so cannot be applied here. See more explanation there.
  2. Only the rows that update the columns referenced by the UDF should be evaluated. For example, given UDF(column_1, column_2) -> column_5 and the incoming Update event only updates column_3, then we don't need to call the UDF, but just keep the original value of column_5.
  3. (Mentioned here by @st1page and I agree +1) Only calls UDF for Insert and UpdateInsert, and put a null on the value part. After all, UDFs are not immutable and not pure. It doesn't actually solve any problems by evaluating them for delete rows.

@fuyufjh
Copy link
Member

fuyufjh commented Sep 13, 2023

@wangrunji0408 Can you please take this?

@wangrunji0408 wangrunji0408 self-assigned this Sep 13, 2023
@lmatz
Copy link
Contributor Author

lmatz commented Sep 14, 2023

Only the rows that update the columns referenced by the UDF should be evaluated. For example, given UDF(column_1, column_2) -> column_5 and the incoming Update event only updates column_3, then we don't need to call the UDF, but just keep the original value of column_5.

is going to cover the original intention of the issue and does even more

@BugenZhao
Copy link
Member

2. Only the rows that update the columns referenced by the UDF should be evaluated. For example, given UDF(column_1, column_2) -> column_5 and the incoming Update event only updates column_3, then we don't need to call the UDF, but just keep the original value of column_5.

Are we actually assuming the UDF to be pure in this way?

@fuyufjh
Copy link
Member

fuyufjh commented Sep 14, 2023

  1. Only the rows that update the columns referenced by the UDF should be evaluated. For example, given UDF(column_1, column_2) -> column_5 and the incoming Update event only updates column_3, then we don't need to call the UDF, but just keep the original value of column_5.

Are we actually assuming the UDF to be pure in this way?

Even for non-pure (aka. non-deterministic) functions, this still makes sense to me 🤔

From my understanding, most non-deterministic are not intended to be non-deterministic like NOW(). Taking an external lookup UDF get_detailed_user_info(user_id) as an example, the users want to append more column to his data stream, and if user_id remains unchanged, I think it should not look up this user again, just like HashJoin

@fuyufjh
Copy link
Member

fuyufjh commented Sep 14, 2023

Noted down the conclusion of the huddle just now:

#12201 (comment)

Will do 1 & 2, but not 3.

@wangrunji0408
Copy link
Contributor

  1. (Mentioned here by @st1page and I agree +1) Only calls UDF for Insert and UpdateInsert, and put a null on the value part.
    Convert the retractable stream to an upsert stream
    If the UDFproject's downstream is a materialize executor, when user ensures that their UDF cost is higher than the lookup old value for each row in materialize, we can convert the stream to upsert stream, so that the UDF will only processed on "updateInsert" and ignore the "UpdateDelete"

Random idea: Should we introduce a new streaming operation Upsert to represent upsert stream? IIUC, we can't only keep UpdateInsert and remove UpdateDelete since they are assumed paired, right? Otherwise we have to find another way to tell the UDF to ignore the UpdateDelete.

Copy link
Contributor

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

@stdrc
Copy link
Member

stdrc commented Oct 17, 2024

IIUC, if we introduce something like a MaterializedProject to do UDFs, we get the following benefits right?

  1. For updates, we only need to call UDF for the UpdateInsert.
  2. If all arguments are not changed, we don't need to call UDF again.

This actually reminds me of the design of general OverWindow, in which we already achieved (1), and we plan to do the optimization (2) (done by #19056).

@stdrc stdrc modified the milestones: release-2.2, release-2.3 Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants