Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add item ID to the workqueue instead #165

Closed
wants to merge 18 commits into from
Closed

Add item ID to the workqueue instead #165

wants to merge 18 commits into from

Conversation

pingsutw
Copy link
Member

TL;DR

I found that Propeller keeps sending the request to the agent even workflow is done.

The work queue only has unique items, so we add itemID to it. The workers won't process the item again after the task is done.

Before:

enqueueBatches workqueue sync (10 workers)
t1 obj(1) obj(1)
t2
t3 obj(2) obj(1), obj(2)
t4
t5 obj(1) obj(1), obj(2), obj(1)
t6 obj(1), obj(2), obj(1)

After:

enqueueBatches workqueue sync (10 workers)
t1 1 1
t2
t3 2 1, 2
t4
t5 1 1, 2
t6 1, 2

100 workflows, each has 100 tasks.
Before:
image

After:
image

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

^^^

Tracking Issue

NA

Follow-up issue

NA

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@pingsutw pingsutw marked this pull request as draft September 21, 2023 05:23
@kumare3
Copy link
Contributor

kumare3 commented Sep 21, 2023

fix itemId -> itemID

Also, in the worst case would just one more call to Get would done?

@@ -235,7 +235,8 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {

for _, batch := range batches {
b := batch
w.workqueue.Add(&b)
logger.Debugf(ctx, "Enqueuing batch with id: %v", b[0].GetID())
w.workqueue.Add(b[0].GetID())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that batches only have one item... I don't think this is true in all usage of AutoRefresh cache... if it's, I would rather deprecate the whole batching mechanism...
This implementation won't work for any batches > 1 item.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In flytepropeller, batches always only have one item.

func SingleItemBatches(_ context.Context, snapshot []ItemWrapper) (batches []Batch, err error) {
res := make([]Batch, 0, len(snapshot))
for _, item := range snapshot {
res = append(res, Batch{item})
}
return res, nil
}

Will clean up the code.

Signed-off-by: Kevin Su <[email protected]>
@codecov
Copy link

codecov bot commented Sep 25, 2023

Codecov Report

Attention: 5 lines in your changes are missing coverage. Please review.

Comparison is base (8f40575) 67.83% compared to head (07e6f22) 67.79%.

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #165      +/-   ##
==========================================
- Coverage   67.83%   67.79%   -0.04%     
==========================================
  Files          69       69              
  Lines        4113     4124      +11     
==========================================
+ Hits         2790     2796       +6     
- Misses       1159     1163       +4     
- Partials      164      165       +1     
Flag Coverage Δ
unittests 67.79% <72.22%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
cache/auto_refresh.go 74.23% <72.22%> (-1.43%) ⬇️

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@pingsutw pingsutw marked this pull request as ready for review September 26, 2023 07:48
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@eapolinario
Copy link
Contributor

Hi, we are moving all Flyte development to a monorepo. In order to help the transition period, we're moving open PRs to monorepo automatically and your PR was moved to flyteorg/flyte#4138. Notice that if there are any conflicts in the resulting PR they most likely happen due to the change in the import path of the flyte components.

@eapolinario eapolinario closed this Oct 3, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants