-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] finalizer approach to fix race condition due to pruning in results watcher #703
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ package dynamic | |
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"time" | ||
|
||
|
@@ -48,6 +49,8 @@ var ( | |
clock = clockwork.NewRealClock() | ||
) | ||
|
||
const LogFinalizer = "results.tekton.dev/streaming-logs" | ||
|
||
// Reconciler implements common reconciler behavior across different Tekton Run | ||
// Object types. | ||
type Reconciler struct { | ||
|
@@ -81,8 +84,10 @@ func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient | |
resultsClient: results.NewClient(rc, lc), | ||
objectClient: oc, | ||
cfg: cfg, | ||
// Always true predicate. | ||
IsReadyForDeletionFunc: func(ctx context.Context, object results.Object) (bool, error) { | ||
if LogsFinalizerExist(object, LogFinalizer) { | ||
return false, nil | ||
} | ||
return true, nil | ||
}, | ||
} | ||
|
@@ -114,6 +119,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error { | |
|
||
// Update logs if enabled. | ||
if r.resultsClient.LogsClient != nil { | ||
// Add finalizer for new object if the object has never been reconciled before | ||
// if the object has the results log annotation then the log has been sent/streaming | ||
annotations := o.GetAnnotations() | ||
if _, exists := annotations[annotation.Log]; !exists { | ||
err = r.AddFinalizer(ctx, o) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
if err := r.sendLog(ctx, o); err != nil { | ||
logger.Errorw("Error sending log", | ||
zap.String("namespace", o.GetNamespace()), | ||
|
@@ -342,11 +356,6 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error { | |
zap.Error(err), | ||
) | ||
} | ||
logger.Debugw("Streaming log completed", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This asynchronous functions never reaches this point if there is a successful streaming of logs while we do see this when there an error while streaming logs |
||
zap.String("namespace", o.GetNamespace()), | ||
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), | ||
zap.String("name", o.GetName()), | ||
) | ||
}() | ||
} | ||
|
||
|
@@ -388,19 +397,65 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, | |
return fmt.Errorf("error reading from tkn reader: %w", err) | ||
} | ||
|
||
errChanRepeater := make(chan error) | ||
go func(echan <-chan error, o metav1.Object) { | ||
writeErr := <-echan | ||
errChanRepeater <- writeErr | ||
errChanRepeater := make(chan error, 100) | ||
|
||
_, err := writer.Flush() | ||
if err != nil { | ||
logger.Error(err) | ||
// logctx is derived from ctx. Therefore, if ctx is cancelled (either explicitly through a call to its cancel | ||
// function or when it reaches its deadline), logctx will be cancelled automatically. | ||
// TODO: Implement configurable timeout based on user feedback analysis. | ||
logctx, _ := context.WithTimeout(ctx, 10*time.Minute) | ||
ramessesii2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
go func(ctx context.Context, echan <-chan error, o metav1.Object) { | ||
defer close(errChanRepeater) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good use of defer @ramessesii2 @sayan-biswas where are we closing the original There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. logChan and errChan channels are receiver ends. When the channels ae no longer used, they will be garbage collected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah there was a bit of back and forth on that thread @ramessesii2 but yes I think that is the end interpretation. good find / research @ramessesii2 that said, given the exchanges I just saw with @pmacik today in slack, I should probably make a more thorough pass of the changes here .... admittedly I only focused on the mem leak bit initially. will do that today thanks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok @ramessesii2 @sayan-biswas I think I looked at this too quickly yesterday the close of the move the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason why I'm deferring close(errChanRepeater) in this goroutine is because it's important to tell the receiving goroutines that all data have been sent which is here:
else the above code will not exit the execution if we don't close errChanRepeater before we do a tknlog.NewWriter()...... I confirmed this as well while debugging. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep we are good here per the thread below |
||
select { | ||
case <-ctx.Done(): | ||
logger.Warnw("Context done streaming log", | ||
zap.String("namespace", o.GetNamespace()), | ||
zap.String("name", o.GetName()), | ||
) | ||
case writeErr := <-echan: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to close the loop @ramessesii2 on you comment https://github.com/tektoncd/results/pull/703/files#r1486049142 if you are only closing the channel in the go routine, what unblocks this call? are we always hitting the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey Gabe,
My understanding is based on the fact that even if echan is closed by tkn reader, reading a closed channel, it will always be successful, returning the zero value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. neither the tekton reader or writer are closing this channel based on the code I've seen @ramessesii2 can you provide the code link other than this goroutine where this channel is being closed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nevermind I found it :-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, the flow is tknReader.read() -> readPipelineLog() -> finally landing to readAvailablePipelineLogs() for our case since we only stream longs once PipelineRuns/TRs have a specific closed state -> close echan (errC) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure how the original echan could ever be nil, but in general defensive programming is good :-) I'll wait for you update and then we can go from there - thanks |
||
errChanRepeater <- writeErr | ||
} | ||
var gofuncerr error | ||
_, gofuncerr = writer.Flush() | ||
if gofuncerr != nil { | ||
logger.Error(gofuncerr) | ||
} | ||
logger.Info("Flush in streamLogs done", | ||
zap.String("namespace", o.GetNamespace()), | ||
zap.String("name", o.GetName()), | ||
) | ||
if err = logsClient.CloseSend(); err != nil { | ||
logger.Error(err) | ||
} | ||
}(errChan, o) | ||
logger.Info("Gofunc in streamLogs done", | ||
zap.String("namespace", o.GetNamespace()), | ||
zap.String("name", o.GetName()), | ||
) | ||
err = r.RemoveFinalizer(ctx, o) | ||
if err != nil { | ||
logger.Errorw("Error removing finalizer", | ||
zap.String("namespace", o.GetNamespace()), | ||
zap.String("name", o.GetName()), | ||
zap.Error(err), | ||
) | ||
return | ||
} | ||
|
||
logger.Info("Finalizer removed successfully", | ||
zap.String("namespace", o.GetNamespace()), | ||
zap.String("name", o.GetName()), | ||
) | ||
|
||
if o.GetDeletionTimestamp() != nil { | ||
err = controller.NewRequeueImmediately() | ||
logger.Errorw("Error requing object for deletion", | ||
zap.String("namespace", o.GetNamespace()), | ||
zap.String("name", o.GetName()), | ||
zap.Error(err), | ||
) | ||
} | ||
|
||
}(logctx, errChan, o) | ||
|
||
// errChanRepeater receives stderr from the TaskRun containers. | ||
// This will be forwarded as combined output (stdout and stderr) | ||
|
@@ -410,5 +465,86 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, | |
Err: writer, | ||
}, logChan, errChanRepeater) | ||
|
||
logger.Info("Exiting streaming logs", | ||
zap.String("namespace", o.GetNamespace()), | ||
zap.String("name", o.GetName()), | ||
) | ||
return nil | ||
} | ||
|
||
func (r *Reconciler) AddFinalizer(ctx context.Context, o metav1.Object) error { | ||
if ownerReferences := o.GetOwnerReferences(); len(ownerReferences) > 0 { | ||
// do not add finalizer if the object is owned by a PipelineRun object | ||
for _, or := range ownerReferences { | ||
if or.Kind == "PipelineRun" { | ||
return nil | ||
} | ||
} | ||
} | ||
finalizers := o.GetFinalizers() | ||
for _, f := range finalizers { | ||
if f == LogFinalizer { | ||
return nil | ||
} | ||
} | ||
finalizers = append(finalizers, LogFinalizer) | ||
|
||
patch, err := finalizerPatch(finalizers) | ||
if err != nil { | ||
return fmt.Errorf("error adding results log finalizer: %w", err) | ||
} | ||
|
||
if err = r.objectClient.Patch(ctx, o.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { | ||
return fmt.Errorf("error patching object: %w", err) | ||
} | ||
return nil | ||
} | ||
|
||
func (r *Reconciler) RemoveFinalizer(ctx context.Context, o metav1.Object) error { | ||
finalizers := o.GetFinalizers() | ||
for i, f := range finalizers { | ||
if f == LogFinalizer { | ||
finalizers = append(finalizers[:i], finalizers[i+1:]...) | ||
patch, err := finalizerPatch(finalizers) | ||
if err != nil { | ||
return fmt.Errorf("error removing results log finalizer: %w", err) | ||
} | ||
|
||
if err = r.objectClient.Patch(ctx, o.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { | ||
return fmt.Errorf("error patching object: %w", err) | ||
} | ||
return nil | ||
} | ||
} | ||
return nil | ||
|
||
} | ||
|
||
func LogsFinalizerExist(o metav1.Object, finalizer string) bool { | ||
finalizers := o.GetFinalizers() | ||
for _, f := range finalizers { | ||
if f == finalizer { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
type mergePatch struct { | ||
Metadata metadata `json:"metadata"` | ||
} | ||
|
||
type metadata struct { | ||
Finalizer []string `json:"finalizers"` | ||
} | ||
|
||
func finalizerPatch(finalizers []string) ([]byte, error) { | ||
data := mergePatch{ | ||
Metadata: metadata{ | ||
Finalizer: []string{}, | ||
}, | ||
} | ||
data.Metadata.Finalizer = finalizers | ||
|
||
return json.Marshal(data) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would think that this is not needed, as it is redundant with the
finalizer
pattern.I would expect the resource to be marked as being deleted, but the deletion to be defered by k8s until the last finalizer has been removed.
I could definitely be wrong though 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right deletion is deferred if the Finalizer is there.
So, prior to that change we could see the following message while debugging even if the resource is not deleted properly(due to Finalizer) which is misleading and more importantly we do go on to execute AfterDeletion