diff --git a/src/crd-controller/HostedServices/V1Alpha2Controller.cs b/src/crd-controller/HostedServices/V1Alpha2Controller.cs index 3107217d..e71e0d22 100644 --- a/src/crd-controller/HostedServices/V1Alpha2Controller.cs +++ b/src/crd-controller/HostedServices/V1Alpha2Controller.cs @@ -53,6 +53,7 @@ private IDisposable ObserveKamusSecret(CancellationToken token) ApiVersion, "kamussecrets", token) + .Take(1) .SelectMany(x => Observable.FromAsync(async () => await HandleEvent(x.Item1, x.Item2)) ) @@ -69,13 +70,15 @@ private IDisposable ObserveKamusSecret(CancellationToken token) Environment.Exit(0); }); } + public Task StartAsync(CancellationToken token) { mSubscription = ObserveKamusSecret(token); Observable.Interval(TimeSpan.FromSeconds(mReconciliationIntervalInSeconds)).Subscribe((s) => { - mSubscription.Dispose(); + var oldSubscription = mSubscription; mSubscription = ObserveKamusSecret(token); + oldSubscription.Dispose(); }); mLogger.Information("Starting watch for KamusSecret V1Alpha2 events"); diff --git a/src/crd-controller/crd-controller.csproj b/src/crd-controller/crd-controller.csproj index df194490..05c6e25f 100644 --- a/src/crd-controller/crd-controller.csproj +++ b/src/crd-controller/crd-controller.csproj @@ -7,7 +7,7 @@ - 0.9.0.7 + 0.9.0.8 diff --git a/src/crd-controller/utils/KubernetesExtensions.cs b/src/crd-controller/utils/KubernetesExtensions.cs index 10647dbc..f36ae9ac 100644 --- a/src/crd-controller/utils/KubernetesExtensions.cs +++ b/src/crd-controller/utils/KubernetesExtensions.cs @@ -13,22 +13,24 @@ public static class KubernetesExtensions string version, string plural, CancellationToken cancellationToken - ) where TCRD : class + ) where TCRD : class { + Watcher watcher = null; return Observable.FromAsync(async () => - { - var subject = new System.Reactive.Subjects.Subject<(WatchEventType, TCRD)>(); - var path = $"apis/{group}/{version}/watch/{plural}"; - await kubernetes.WatchObjectAsync(path, - timeoutSeconds: int.MaxValue, - onEvent: (@type, @event) => subject.OnNext((@type, @event)), - onError: e => subject.OnError(e), - onClosed: () => subject.OnCompleted(), cancellationToken: cancellationToken); - return subject; - }) + { + var subject = new System.Reactive.Subjects.Subject<(WatchEventType, TCRD)>(); + var path = $"apis/{group}/{version}/watch/{plural}"; + watcher = await kubernetes.WatchObjectAsync(path, + timeoutSeconds: int.MaxValue, + onEvent: (@type, @event) => subject.OnNext((@type, @event)), + onError: e => subject.OnError(e), + onClosed: () => subject.OnCompleted(), cancellationToken: cancellationToken); + return subject; + }) .SelectMany(x => x) .Select(t => (t.Item1, t.Item2 as TCRD)) - .Where(t => t.Item2 != null); + .Where(t => t.Item2 != null) + .Finally(() => watcher?.Dispose()); } } } diff --git a/src/decrypt-api/decrypt-api.csproj b/src/decrypt-api/decrypt-api.csproj index 9d81760a..aa2ae7c7 100644 --- a/src/decrypt-api/decrypt-api.csproj +++ b/src/decrypt-api/decrypt-api.csproj @@ -3,7 +3,7 @@ netcoreapp3.1 - 0.9.0.7 + 0.9.0.8 diff --git a/src/encrypt-api/encrypt-api.csproj b/src/encrypt-api/encrypt-api.csproj index 8e1eba6f..e789e663 100644 --- a/src/encrypt-api/encrypt-api.csproj +++ b/src/encrypt-api/encrypt-api.csproj @@ -3,7 +3,7 @@ netcoreapp3.1 - 0.9.0.7 + 0.9.0.8