From a6f798737c8870e22e883a62eb0661cfa5c8856c Mon Sep 17 00:00:00 2001 From: Alexey Skalozub Date: Thu, 23 Feb 2017 20:53:57 +0300 Subject: [PATCH] New feature: FollowJobRetentionPolicy to expire consoles along with parent job --- README.md | 1 + appveyor.yml | 2 +- src/Hangfire.Console/ConsoleOptions.cs | 6 + .../GlobalConfigurationExtensions.cs | 5 + src/Hangfire.Console/Server/ConsoleContext.cs | 15 ++ .../Server/ConsoleServerFilter.cs | 20 +- .../States/ConsoleApplyStateFilter.cs | 63 ++++++ .../Storage/ConsoleExpirationTransaction.cs | 54 +++++ .../Storage/ConsoleIdExtensions.cs | 26 +++ .../Storage/ConsoleStorage.cs | 64 +++--- .../Storage/IConsoleExpirationTransaction.cs | 21 ++ .../Storage/IConsoleStorage.cs | 17 +- src/Hangfire.Console/project.json | 4 +- .../Server/ConsoleContextFacts.cs | 38 ++++ .../Server/ConsoleServerFilterFacts.cs | 49 +++-- .../States/ConsoleApplyStateFilterFacts.cs | 207 ++++++++++++++++++ .../ConsoleExpirationTransactionFacts.cs | 86 ++++++++ .../Storage/ConsoleStorageFacts.cs | 89 ++++++-- 18 files changed, 680 insertions(+), 87 deletions(-) create mode 100644 src/Hangfire.Console/States/ConsoleApplyStateFilter.cs create mode 100644 src/Hangfire.Console/Storage/ConsoleExpirationTransaction.cs create mode 100644 src/Hangfire.Console/Storage/ConsoleIdExtensions.cs create mode 100644 src/Hangfire.Console/Storage/IConsoleExpirationTransaction.cs create mode 100644 tests/Hangfire.Console.Tests/States/ConsoleApplyStateFilterFacts.cs create mode 100644 tests/Hangfire.Console.Tests/Storage/ConsoleExpirationTransactionFacts.cs diff --git a/README.md b/README.md index b862729..2ed0c6c 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ As usual, you may provide additional options for `UseConsole()` method. Here's what you can configure: - **ExpireIn** – time to keep console sessions (default: 24 hours) +- **FollowJobRetentionPolicy** – expire all console sessions along with parent job (default: true) - **PollInterval** – poll interval for live updates, ms (default: 1000) - **BackgroundColor** – console background color (default: #0d3163) - **TextColor** – console default text color (default: #ffffff) diff --git a/appveyor.yml b/appveyor.yml index bc2f723..8ad0af5 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,4 +1,4 @@ -version: 1.2.1-{build} +version: 1.3.0-{build} os: Visual Studio 2015 diff --git a/src/Hangfire.Console/ConsoleOptions.cs b/src/Hangfire.Console/ConsoleOptions.cs index b4feff5..98d1a55 100644 --- a/src/Hangfire.Console/ConsoleOptions.cs +++ b/src/Hangfire.Console/ConsoleOptions.cs @@ -12,6 +12,12 @@ public class ConsoleOptions /// public TimeSpan ExpireIn { get; set; } = TimeSpan.FromDays(1); + /// + /// Gets or sets if console messages should follow the same retention policy as the parent job. + /// When set to true, parameter is ignored. + /// + public bool FollowJobRetentionPolicy { get; set; } = true; + /// /// Gets or sets console poll interval (in ms). /// diff --git a/src/Hangfire.Console/GlobalConfigurationExtensions.cs b/src/Hangfire.Console/GlobalConfigurationExtensions.cs index d34927f..36aa312 100644 --- a/src/Hangfire.Console/GlobalConfigurationExtensions.cs +++ b/src/Hangfire.Console/GlobalConfigurationExtensions.cs @@ -1,5 +1,6 @@ using Hangfire.Console.Dashboard; using Hangfire.Console.Server; +using Hangfire.Console.States; using Hangfire.Dashboard; using Hangfire.Dashboard.Extensions; using Hangfire.States; @@ -30,6 +31,10 @@ public static IGlobalConfiguration UseConsole(this IGlobalConfiguration configur // register server filter for jobs GlobalJobFilters.Filters.Add(new ConsoleServerFilter(options)); + // register apply state filter for jobs + // (context may be altered by other state filters, so make it the very last filter in chain to use final context values) + GlobalJobFilters.Filters.Add(new ConsoleApplyStateFilter(options), int.MaxValue); + // replace renderer for Processing state JobHistoryRenderer.Register(ProcessingState.StateName, new ProcessingStateRenderer(options).Render); diff --git a/src/Hangfire.Console/Server/ConsoleContext.cs b/src/Hangfire.Console/Server/ConsoleContext.cs index 28e020f..264b3d6 100644 --- a/src/Hangfire.Console/Server/ConsoleContext.cs +++ b/src/Hangfire.Console/Server/ConsoleContext.cs @@ -44,6 +44,8 @@ public ConsoleContext(ConsoleId consoleId, IConsoleStorage storage) _lastTimeOffset = 0; _nextProgressBarId = 0; + + _storage.InitConsole(_consoleId); } public ConsoleTextColor TextColor { get; set; } @@ -87,5 +89,18 @@ public void Expire(TimeSpan expireIn) { _storage.Expire(_consoleId, expireIn); } + + public void FixExpiration() + { + TimeSpan ttl = _storage.GetConsoleTtl(_consoleId); + if (ttl < TimeSpan.Zero) + { + // ConsoleApplyStateFilter not called yet, or current job state is not final. + // Either way, there's no need to expire console here. + return; + } + + _storage.Expire(_consoleId, ttl); + } } } diff --git a/src/Hangfire.Console/Server/ConsoleServerFilter.cs b/src/Hangfire.Console/Server/ConsoleServerFilter.cs index f73a0f1..a12ab2b 100644 --- a/src/Hangfire.Console/Server/ConsoleServerFilter.cs +++ b/src/Hangfire.Console/Server/ConsoleServerFilter.cs @@ -47,14 +47,22 @@ public void OnPerforming(PerformingContext filterContext) public void OnPerformed(PerformedContext filterContext) { - if (filterContext.Canceled) + if (_options.FollowJobRetentionPolicy) { - // Processing was been cancelled by one of the job filters - // There's nothing to do here, as processing hasn't started - return; - } + // Console sessions follow parent job expiration. + // Normally, ConsoleApplyStateFilter will update expiration for all consoles, no extra work needed. + + // If the running job is deleted from the Dashboard, ConsoleApplyStateFilter will be called immediately, + // but the job will continue running, unless IJobCancellationToken.ThrowIfCancellationRequested() is met. + // If anything is written to console after the job was deleted, it won't get a correct expiration assigned. - ConsoleContext.FromPerformContext(filterContext)?.Expire(_options.ExpireIn); + // Need to re-apply expiration to prevent those records from becoming eternal garbage. + ConsoleContext.FromPerformContext(filterContext)?.FixExpiration(); + } + else + { + ConsoleContext.FromPerformContext(filterContext)?.Expire(_options.ExpireIn); + } } } } diff --git a/src/Hangfire.Console/States/ConsoleApplyStateFilter.cs b/src/Hangfire.Console/States/ConsoleApplyStateFilter.cs new file mode 100644 index 0000000..40f4549 --- /dev/null +++ b/src/Hangfire.Console/States/ConsoleApplyStateFilter.cs @@ -0,0 +1,63 @@ +using Hangfire.States; +using System; +using System.Linq; +using Hangfire.Storage; +using Hangfire.Console.Serialization; +using Hangfire.Common; +using Hangfire.Console.Storage; + +namespace Hangfire.Console.States +{ + internal class ConsoleApplyStateFilter : IApplyStateFilter + { + private readonly ConsoleOptions _options; + + public ConsoleApplyStateFilter(ConsoleOptions options) + { + if (options == null) + throw new ArgumentNullException(nameof(options)); + + _options = options; + } + + public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction) + { + if (!_options.FollowJobRetentionPolicy) + { + // Console sessions use their own expiration timeout. + // Do not expire here, will be expired by ConsoleServerFilter. + return; + } + + var jobDetails = context.Storage.GetMonitoringApi().JobDetails(context.BackgroundJob.Id); + if (jobDetails == null || jobDetails.History == null) + { + // WTF?! + return; + } + + var expiration = new ConsoleExpirationTransaction((JobStorageTransaction)transaction); + + foreach (var state in jobDetails.History.Where(x => x.StateName == ProcessingState.StateName)) + { + var consoleId = new ConsoleId(context.BackgroundJob.Id, JobHelper.DeserializeDateTime(state.Data["StartedAt"])); + + if (context.NewState.IsFinal) + { + // Job in final state is a subject for expiration. + // To keep storage clean, its console sessions should also be expired. + expiration.Expire(consoleId, context.JobExpirationTimeout); + } + else + { + // Job will be persisted, so should its console sessions. + expiration.Persist(consoleId); + } + } + } + + public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction) + { + } + } +} diff --git a/src/Hangfire.Console/Storage/ConsoleExpirationTransaction.cs b/src/Hangfire.Console/Storage/ConsoleExpirationTransaction.cs new file mode 100644 index 0000000..d2e7d77 --- /dev/null +++ b/src/Hangfire.Console/Storage/ConsoleExpirationTransaction.cs @@ -0,0 +1,54 @@ +using System; +using Hangfire.Console.Serialization; +using Hangfire.Storage; + +namespace Hangfire.Console.Storage +{ + internal class ConsoleExpirationTransaction : IConsoleExpirationTransaction + { + private readonly JobStorageTransaction _transaction; + + public ConsoleExpirationTransaction(JobStorageTransaction transaction) + { + if (transaction == null) + throw new ArgumentNullException(nameof(transaction)); + + _transaction = transaction; + } + + public void Dispose() + { + _transaction.Dispose(); + } + + public void Expire(ConsoleId consoleId, TimeSpan expireIn) + { + if (consoleId == null) + throw new ArgumentNullException(nameof(consoleId)); + + _transaction.ExpireSet(consoleId.GetSetKey(), expireIn); + _transaction.ExpireHash(consoleId.GetHashKey(), expireIn); + + // After upgrading to Hangfire.Console version with new keys, + // there may be existing background jobs with console attached + // to the previous keys. We should expire them also. + _transaction.ExpireSet(consoleId.GetOldConsoleKey(), expireIn); + _transaction.ExpireHash(consoleId.GetOldConsoleKey(), expireIn); + } + + public void Persist(ConsoleId consoleId) + { + if (consoleId == null) + throw new ArgumentNullException(nameof(consoleId)); + + _transaction.PersistSet(consoleId.GetSetKey()); + _transaction.PersistHash(consoleId.GetHashKey()); + + // After upgrading to Hangfire.Console version with new keys, + // there may be existing background jobs with console attached + // to the previous keys. We should persist them also. + _transaction.PersistSet(consoleId.GetOldConsoleKey()); + _transaction.PersistHash(consoleId.GetOldConsoleKey()); + } + } +} diff --git a/src/Hangfire.Console/Storage/ConsoleIdExtensions.cs b/src/Hangfire.Console/Storage/ConsoleIdExtensions.cs new file mode 100644 index 0000000..ed0efb8 --- /dev/null +++ b/src/Hangfire.Console/Storage/ConsoleIdExtensions.cs @@ -0,0 +1,26 @@ +using Hangfire.Console.Serialization; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Hangfire.Console.Storage +{ + internal static class ConsoleIdExtensions + { + public static string GetSetKey(this ConsoleId consoleId) + { + return $"console:{consoleId}"; + } + + public static string GetHashKey(this ConsoleId consoleId) + { + return $"console:refs:{consoleId}"; + } + + public static string GetOldConsoleKey(this ConsoleId consoleId) + { + return consoleId.ToString(); + } + } +} diff --git a/src/Hangfire.Console/Storage/ConsoleStorage.cs b/src/Hangfire.Console/Storage/ConsoleStorage.cs index b8e9932..5c26631 100644 --- a/src/Hangfire.Console/Storage/ConsoleStorage.cs +++ b/src/Hangfire.Console/Storage/ConsoleStorage.cs @@ -19,12 +19,23 @@ public ConsoleStorage(IStorageConnection connection) _connection = (JobStorageConnection)connection; } - + public void Dispose() { _connection.Dispose(); } - + + public void InitConsole(ConsoleId consoleId) + { + if (consoleId == null) + throw new ArgumentNullException(nameof(consoleId)); + + // We add an extra "jobId" record into Hash for console, + // to correctly track TTL even if console contains no lines + + _connection.SetRangeInHash(consoleId.GetHashKey(), new[] { new KeyValuePair("jobId", consoleId.JobId) }); + } + public void AddLine(ConsoleId consoleId, ConsoleLine line) { if (consoleId == null) @@ -61,7 +72,7 @@ public void AddLine(ConsoleId consoleId, ConsoleLine line) { var referenceKey = Guid.NewGuid().ToString("N"); - tran.SetRangeInHash(GetHashKey(consoleId), new[] { new KeyValuePair(referenceKey, line.Message) }); + tran.SetRangeInHash(consoleId.GetHashKey(), new[] { new KeyValuePair(referenceKey, line.Message) }); line.Message = referenceKey; line.IsReference = true; @@ -69,27 +80,29 @@ public void AddLine(ConsoleId consoleId, ConsoleLine line) value = JobHelper.ToJson(line); } - tran.AddToSet(GetSetKey(consoleId), value, line.TimeOffset); + tran.AddToSet(consoleId.GetSetKey(), value, line.TimeOffset); tran.Commit(); } } + public TimeSpan GetConsoleTtl(ConsoleId consoleId) + { + if (consoleId == null) + throw new ArgumentNullException(nameof(consoleId)); + + return _connection.GetHashTtl(consoleId.GetHashKey()); + } + public void Expire(ConsoleId consoleId, TimeSpan expireIn) { if (consoleId == null) throw new ArgumentNullException(nameof(consoleId)); using (var tran = (JobStorageTransaction)_connection.CreateWriteTransaction()) + using (var expiration = new ConsoleExpirationTransaction(tran)) { - tran.ExpireSet(GetSetKey(consoleId), expireIn); - tran.ExpireHash(GetHashKey(consoleId), expireIn); - - // After upgrading to Hangfire.Console version with new keys, - // there may be existing background jobs with console attached - // to the previous keys. We should expire them also. - tran.ExpireSet(GetOldConsoleKey(consoleId), expireIn); - tran.ExpireHash(GetOldConsoleKey(consoleId), expireIn); + expiration.Expire(consoleId, expireIn); tran.Commit(); } @@ -100,13 +113,13 @@ public int GetLineCount(ConsoleId consoleId) if (consoleId == null) throw new ArgumentNullException(nameof(consoleId)); - var result = (int)_connection.GetSetCount(GetSetKey(consoleId)); + var result = (int)_connection.GetSetCount(consoleId.GetSetKey()); if (result == 0) { // Read operations should be backwards compatible and use // old keys, if new one don't contain any data. - return (int)_connection.GetSetCount(GetOldConsoleKey(consoleId)); + return (int)_connection.GetSetCount(consoleId.GetOldConsoleKey()); } return result; @@ -118,13 +131,13 @@ public IEnumerable GetLines(ConsoleId consoleId, int start, int end throw new ArgumentNullException(nameof(consoleId)); var useOldKeys = false; - var items = _connection.GetRangeFromSet(GetSetKey(consoleId), start, end); + var items = _connection.GetRangeFromSet(consoleId.GetSetKey(), start, end); if (items == null || items.Count == 0) { // Read operations should be backwards compatible and use // old keys, if new one don't contain any data. - items = _connection.GetRangeFromSet(GetOldConsoleKey(consoleId), start, end); + items = _connection.GetRangeFromSet(consoleId.GetOldConsoleKey(), start, end); useOldKeys = true; } @@ -138,7 +151,7 @@ public IEnumerable GetLines(ConsoleId consoleId, int start, int end { try { - line.Message = _connection.GetValueFromHash(GetOldConsoleKey(consoleId), line.Message); + line.Message = _connection.GetValueFromHash(consoleId.GetOldConsoleKey(), line.Message); } catch { @@ -149,7 +162,7 @@ public IEnumerable GetLines(ConsoleId consoleId, int start, int end } else { - line.Message = _connection.GetValueFromHash(GetHashKey(consoleId), line.Message); + line.Message = _connection.GetValueFromHash(consoleId.GetHashKey(), line.Message); } line.IsReference = false; @@ -166,20 +179,5 @@ public StateData GetState(ConsoleId consoleId) return _connection.GetStateData(consoleId.JobId); } - - private string GetSetKey(ConsoleId consoleId) - { - return $"console:{consoleId}"; - } - - private string GetHashKey(ConsoleId consoleId) - { - return $"console:refs:{consoleId}"; - } - - private string GetOldConsoleKey(ConsoleId consoleId) - { - return consoleId.ToString(); - } } } diff --git a/src/Hangfire.Console/Storage/IConsoleExpirationTransaction.cs b/src/Hangfire.Console/Storage/IConsoleExpirationTransaction.cs new file mode 100644 index 0000000..4c1a823 --- /dev/null +++ b/src/Hangfire.Console/Storage/IConsoleExpirationTransaction.cs @@ -0,0 +1,21 @@ +using Hangfire.Console.Serialization; +using System; + +namespace Hangfire.Console.Storage +{ + internal interface IConsoleExpirationTransaction : IDisposable + { + /// + /// Expire data for console. + /// + /// Console identifier + /// Expiration time + void Expire(ConsoleId consoleId, TimeSpan expireIn); + + /// + /// Persist data for console. + /// + /// Console identifier + void Persist(ConsoleId consoleId); + } +} diff --git a/src/Hangfire.Console/Storage/IConsoleStorage.cs b/src/Hangfire.Console/Storage/IConsoleStorage.cs index ded2f35..bcf960f 100644 --- a/src/Hangfire.Console/Storage/IConsoleStorage.cs +++ b/src/Hangfire.Console/Storage/IConsoleStorage.cs @@ -24,6 +24,12 @@ internal interface IConsoleStorage : IDisposable /// End index (inclusive) IEnumerable GetLines(ConsoleId consoleId, int start, int end); + /// + /// Initializes console. + /// + /// Console identifier + void InitConsole(ConsoleId consoleId); + /// /// Adds line to console. /// @@ -31,15 +37,22 @@ internal interface IConsoleStorage : IDisposable /// Line to add void AddLine(ConsoleId consoleId, ConsoleLine line); + /// + /// Returns current expiration TTL for console. + /// If console is not expired, returns negative . + /// + /// Console identifier + TimeSpan GetConsoleTtl(ConsoleId consoleId); + /// /// Expire data for console. /// /// Console identifier /// Expiration time void Expire(ConsoleId consoleId, TimeSpan expireIn); - + /// - /// Returns State object associated with console. + /// Returns last (current) state of the console's parent job. /// /// Console identifier StateData GetState(ConsoleId consoleId); diff --git a/src/Hangfire.Console/project.json b/src/Hangfire.Console/project.json index df4f55a..a1bf803 100644 --- a/src/Hangfire.Console/project.json +++ b/src/Hangfire.Console/project.json @@ -1,5 +1,5 @@ { - "version": "1.2.1-*", + "version": "1.3.0-*", "title": "Hangfire.Console", "description": "Job console for Hangfire", @@ -11,7 +11,7 @@ "owners": [ "Alexey Skalozub" ], "iconUrl": "https://raw.githubusercontent.com/pieceofsummer/Hangfire.Console/master/hangfire.console.png", "licenseUrl": "https://raw.githubusercontent.com/pieceofsummer/Hangfire.Console/master/LICENSE.md", - "releaseNotes": "- Hyperlink detection\n- Monitoring API", + "releaseNotes": "- Consoles are now expired along with parent job by default!\n- Added **FollowJobRetentionPolicy** option to switch between old/new expiration modes", "repository": { "type": "git", "url": "https://github.com/pieceofsummer/Hangfire.Console" diff --git a/tests/Hangfire.Console.Tests/Server/ConsoleContextFacts.cs b/tests/Hangfire.Console.Tests/Server/ConsoleContextFacts.cs index 289760b..f012496 100644 --- a/tests/Hangfire.Console.Tests/Server/ConsoleContextFacts.cs +++ b/tests/Hangfire.Console.Tests/Server/ConsoleContextFacts.cs @@ -30,6 +30,15 @@ public void Ctor_ThrowsException_IfStorageIsNull() Assert.Throws("storage", () => new ConsoleContext(consoleId, null)); } + [Fact] + public void Ctor_InitializesConsole() + { + var consoleId = new ConsoleId("1", new DateTime(2016, 1, 1, 0, 0, 0, DateTimeKind.Utc)); + var context = new ConsoleContext(consoleId, _storage.Object); + + _storage.Verify(x => x.InitConsole(consoleId)); + } + [Fact] public void AddLine_ThrowsException_IfLineIsNull() { @@ -99,6 +108,35 @@ public void Expire_ReallyExpiresLines() _storage.Verify(x => x.Expire(It.IsAny(), It.IsAny())); } + + [Fact] + public void FixExpiration_RequestsConsoleTtl_IgnoresIfNegative() + { + _storage.Setup(x => x.GetConsoleTtl(It.IsAny())) + .Returns(TimeSpan.FromSeconds(-1)); + + var consoleId = new ConsoleId("1", new DateTime(2016, 1, 1, 0, 0, 0, DateTimeKind.Utc)); + var context = new ConsoleContext(consoleId, _storage.Object); + context.FixExpiration(); + + _storage.Verify(x => x.GetConsoleTtl(It.IsAny())); + _storage.Verify(x => x.Expire(It.IsAny(), It.IsAny()), Times.Never); + } + + [Fact] + public void FixExpiration_RequestsConsoleTtl_ExpiresIfPositive() + { + _storage.Setup(x => x.GetConsoleTtl(It.IsAny())) + .Returns(TimeSpan.FromHours(1)); + + var consoleId = new ConsoleId("1", new DateTime(2016, 1, 1, 0, 0, 0, DateTimeKind.Utc)); + var context = new ConsoleContext(consoleId, _storage.Object); + + context.FixExpiration(); + + _storage.Verify(x => x.GetConsoleTtl(It.IsAny())); + _storage.Verify(x => x.Expire(It.IsAny(), It.IsAny())); + } } } diff --git a/tests/Hangfire.Console.Tests/Server/ConsoleServerFilterFacts.cs b/tests/Hangfire.Console.Tests/Server/ConsoleServerFilterFacts.cs index 159b53a..8f1dc3e 100644 --- a/tests/Hangfire.Console.Tests/Server/ConsoleServerFilterFacts.cs +++ b/tests/Hangfire.Console.Tests/Server/ConsoleServerFilterFacts.cs @@ -29,9 +29,9 @@ public ConsoleServerFilterFacts() } [Fact] - public void DoesNotCreateConsole_IfStateNotFound() + public void DoesNotCreateConsoleContext_IfStateNotFound() { - _connection.Setup(x => x.GetStateData(It.IsAny())) + _connection.Setup(x => x.GetStateData("1")) .Returns((StateData)null); var performer = new BackgroundJobPerformer(CreateJobFilterProvider()); @@ -44,9 +44,9 @@ public void DoesNotCreateConsole_IfStateNotFound() } [Fact] - public void DoesNotCreateConsole_IfStateIsNotProcessing() + public void DoesNotCreateConsoleContext_IfStateIsNotProcessing() { - _connection.Setup(x => x.GetStateData(It.IsAny())) + _connection.Setup(x => x.GetStateData("1")) .Returns(CreateState(SucceededState.StateName)); var performer = new BackgroundJobPerformer(CreateJobFilterProvider()); @@ -57,14 +57,14 @@ public void DoesNotCreateConsole_IfStateIsNotProcessing() var consoleContext = ConsoleContext.FromPerformContext(context); Assert.Null(consoleContext); } - + [Fact] - public void CreatesConsole_IfStateIsProcessing_DoesNotExpireData_IfCancelled() + public void CreatesConsoleContext_IfStateIsProcessing_DoesNotExpireData_IfConsoleNotPresent() { - _connection.Setup(x => x.GetStateData(It.IsAny())) + _connection.Setup(x => x.GetStateData("1")) .Returns(CreateState(ProcessingState.StateName)); _otherFilter.Setup(x => x.OnPerforming(It.IsAny())) - .Callback(x => { x.Canceled = true; }); + .Callback(x => { x.Items.Remove("ConsoleContext"); }); var performer = new BackgroundJobPerformer(CreateJobFilterProvider()); var context = CreatePerformContext(); @@ -72,34 +72,34 @@ public void CreatesConsole_IfStateIsProcessing_DoesNotExpireData_IfCancelled() performer.Perform(context); var consoleContext = ConsoleContext.FromPerformContext(context); - Assert.NotNull(consoleContext); + Assert.Null(consoleContext); _transaction.Verify(x => x.Commit(), Times.Never); } [Fact] - public void CreatesConsole_IfStateIsProcessing_DoesNotExpireData_IfConsoleNotPresent() + public void CreatesConsoleContext_IfStateIsProcessing_FixesExpiration_IfFollowsJobRetention() { - _connection.Setup(x => x.GetStateData(It.IsAny())) + _connection.Setup(x => x.GetStateData("1")) .Returns(CreateState(ProcessingState.StateName)); - _otherFilter.Setup(x => x.OnPerforming(It.IsAny())) - .Callback(x => { x.Items.Remove("ConsoleContext"); }); - var performer = new BackgroundJobPerformer(CreateJobFilterProvider()); + var performer = new BackgroundJobPerformer(CreateJobFilterProvider(true)); var context = CreatePerformContext(); performer.Perform(context); var consoleContext = ConsoleContext.FromPerformContext(context); - Assert.Null(consoleContext); + Assert.NotNull(consoleContext); - _transaction.Verify(x => x.Commit(), Times.Never); + _connection.Verify(x => x.GetHashTtl(It.IsAny())); + + _transaction.Verify(x => x.Commit()); } [Fact] - public void CreatesConsole_IfStateIsProcessing_ExpiresData_IfNotCancelled() + public void CreatesConsoleContext_IfStateIsProcessing_ExpiresData_IfNotFollowsJobRetention() { - _connection.Setup(x => x.GetStateData(It.IsAny())) + _connection.Setup(x => x.GetStateData("1")) .Returns(CreateState(ProcessingState.StateName)); var performer = new BackgroundJobPerformer(CreateJobFilterProvider()); @@ -110,6 +110,11 @@ public void CreatesConsole_IfStateIsProcessing_ExpiresData_IfNotCancelled() var consoleContext = ConsoleContext.FromPerformContext(context); Assert.NotNull(consoleContext); + _connection.Verify(x => x.GetHashTtl(It.IsAny()), Times.Never); + + _transaction.Verify(x => x.ExpireSet(It.IsAny(), It.IsAny())); + _transaction.Verify(x => x.ExpireHash(It.IsAny(), It.IsAny())); + _transaction.Verify(x => x.Commit()); } @@ -117,18 +122,18 @@ public static void JobMethod() { } - private IJobFilterProvider CreateJobFilterProvider() + private IJobFilterProvider CreateJobFilterProvider(bool followJobRetention = false) { var filters = new JobFilterCollection(); - filters.Add(new ConsoleServerFilter(new ConsoleOptions())); + filters.Add(new ConsoleServerFilter(new ConsoleOptions() { FollowJobRetentionPolicy = followJobRetention })); filters.Add(_otherFilter.Object); - return filters; + return new JobFilterProviderCollection(filters); } private PerformContext CreatePerformContext() { return new PerformContext(_connection.Object, - new BackgroundJob("1", Common.Job.FromExpression(() => JobMethod()), DateTime.UtcNow), + new BackgroundJob("1", Job.FromExpression(() => JobMethod()), DateTime.UtcNow), _cancellationToken.Object); } diff --git a/tests/Hangfire.Console.Tests/States/ConsoleApplyStateFilterFacts.cs b/tests/Hangfire.Console.Tests/States/ConsoleApplyStateFilterFacts.cs new file mode 100644 index 0000000..16c0c8c --- /dev/null +++ b/tests/Hangfire.Console.Tests/States/ConsoleApplyStateFilterFacts.cs @@ -0,0 +1,207 @@ +using Hangfire.Common; +using Hangfire.Console.States; +using Hangfire.States; +using Hangfire.Storage; +using Hangfire.Storage.Monitoring; +using Moq; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace Hangfire.Console.Tests.States +{ + public class ConsoleApplyStateFilterFacts + { + private readonly Mock _otherFilter; + private readonly Mock _storage; + private readonly Mock _connection; + private readonly Mock _transaction; + private readonly Mock _monitoring; + + public ConsoleApplyStateFilterFacts() + { + _otherFilter = new Mock(); + _storage = new Mock(); + _connection = new Mock(); + _transaction = new Mock(); + _monitoring = new Mock(); + + _storage.Setup(x => x.GetConnection()) + .Returns(_connection.Object); + _storage.Setup(x => x.GetMonitoringApi()) + .Returns(_monitoring.Object); + + _connection.Setup(x => x.CreateWriteTransaction()) + .Returns(_transaction.Object); + } + + [Fact] + public void UsesFinalJobExpirationTimeoutValue() + { + _otherFilter.Setup(x => x.OnStateApplied(It.IsAny(), It.IsAny())) + .Callback((c, t) => c.JobExpirationTimeout = TimeSpan.FromSeconds(123)); + _connection.Setup(x => x.GetJobData("1")) + .Returns(CreateJobData(ProcessingState.StateName)); + _monitoring.Setup(x => x.JobDetails("1")) + .Returns(CreateJobDetails()); + + var stateChanger = new BackgroundJobStateChanger(CreateJobFilterProvider()); + var context = CreateStateChangeContext(new MockSucceededState()); + + stateChanger.ChangeState(context); + + _transaction.Verify(x => x.ExpireJob(It.IsAny(), TimeSpan.FromSeconds(123))); + _transaction.Verify(x => x.ExpireSet(It.IsAny(), TimeSpan.FromSeconds(123))); + } + + [Fact] + public void DoesNotExpire_IfNotFollowsJobRetention() + { + _connection.Setup(x => x.GetJobData("1")) + .Returns(CreateJobData(ProcessingState.StateName)); + _monitoring.Setup(x => x.JobDetails("1")) + .Returns(CreateJobDetails()); + + var stateChanger = new BackgroundJobStateChanger(CreateJobFilterProvider(false)); + var context = CreateStateChangeContext(new MockSucceededState()); + + stateChanger.ChangeState(context); + + _transaction.Verify(x => x.ExpireSet(It.IsAny(), It.IsAny()), Times.Never); + _transaction.Verify(x => x.ExpireHash(It.IsAny(), It.IsAny()), Times.Never); + } + + [Fact] + public void Expires_IfStateIsFinal() + { + _connection.Setup(x => x.GetJobData("1")) + .Returns(CreateJobData(ProcessingState.StateName)); + _monitoring.Setup(x => x.JobDetails("1")) + .Returns(CreateJobDetails()); + + var stateChanger = new BackgroundJobStateChanger(CreateJobFilterProvider()); + var context = CreateStateChangeContext(new MockSucceededState()); + + stateChanger.ChangeState(context); + + _transaction.Verify(x => x.ExpireSet(It.IsAny(), It.IsAny())); + _transaction.Verify(x => x.ExpireHash(It.IsAny(), It.IsAny())); + } + + [Fact] + public void Persists_IfStateIsNotFinal() + { + _connection.Setup(x => x.GetJobData("1")) + .Returns(CreateJobData(ProcessingState.StateName)); + _monitoring.Setup(x => x.JobDetails("1")) + .Returns(CreateJobDetails()); + + var stateChanger = new BackgroundJobStateChanger(CreateJobFilterProvider()); + var context = CreateStateChangeContext(new MockFailedState()); + + stateChanger.ChangeState(context); + + _transaction.Verify(x => x.PersistSet(It.IsAny())); + _transaction.Verify(x => x.PersistHash(It.IsAny())); + } + + private IJobFilterProvider CreateJobFilterProvider(bool followJobRetention = true) + { + var filters = new JobFilterCollection(); + filters.Add(new ConsoleApplyStateFilter(new ConsoleOptions() { FollowJobRetentionPolicy = followJobRetention }), int.MaxValue); + filters.Add(_otherFilter.Object); + return new JobFilterProviderCollection(filters); + } + + public class MockSucceededState : IState + { + public string Name => SucceededState.StateName; + + public string Reason => null; + + public bool IsFinal => true; + + public bool IgnoreJobLoadException => false; + + public Dictionary SerializeData() + { + return new Dictionary(); + } + } + + public class MockFailedState : IState + { + public string Name => FailedState.StateName; + + public string Reason => null; + + public bool IsFinal => false; + + public bool IgnoreJobLoadException => false; + + public Dictionary SerializeData() + { + return new Dictionary(); + } + } + + private StateChangeContext CreateStateChangeContext(IState state) + { + return new StateChangeContext(_storage.Object, _connection.Object, "1", state); + } + + public static void JobMethod() + { + } + + private JobDetailsDto CreateJobDetails() + { + var date = DateTime.UtcNow.AddHours(-1); + var history = new List(); + + history.Add(new StateHistoryDto() + { + StateName = EnqueuedState.StateName, + CreatedAt = date, + Data = new Dictionary() + { + ["EnqueuedAt"] = JobHelper.SerializeDateTime(date), + ["Queue"] = EnqueuedState.DefaultQueue + } + }); + + history.Add(new StateHistoryDto() + { + StateName = ProcessingState.StateName, + CreatedAt = date.AddSeconds(2), + Data = new Dictionary() + { + ["StartedAt"] = JobHelper.SerializeDateTime(date.AddSeconds(2)), + ["ServerId"] = "SERVER-1", + ["WorkerId"] = "WORKER-1" + } + }); + + history.Reverse(); + + return new JobDetailsDto() + { + CreatedAt = history[0].CreatedAt, + Job = Job.FromExpression(() => JobMethod()), + History = history + }; + } + + private JobData CreateJobData(string state) + { + return new JobData() + { + CreatedAt = DateTime.UtcNow.AddHours(-1), + Job = Job.FromExpression(() => JobMethod()), + State = state + }; + } + } +} diff --git a/tests/Hangfire.Console.Tests/Storage/ConsoleExpirationTransactionFacts.cs b/tests/Hangfire.Console.Tests/Storage/ConsoleExpirationTransactionFacts.cs new file mode 100644 index 0000000..f33baf1 --- /dev/null +++ b/tests/Hangfire.Console.Tests/Storage/ConsoleExpirationTransactionFacts.cs @@ -0,0 +1,86 @@ +using Hangfire.Console.Serialization; +using Hangfire.Console.Storage; +using Hangfire.Storage; +using Moq; +using System; +using Xunit; + +namespace Hangfire.Console.Tests.Storage +{ + public class ConsoleExpirationTransactionFacts + { + private readonly ConsoleId _consoleId; + + private readonly Mock _transaction; + + public ConsoleExpirationTransactionFacts() + { + _consoleId = new ConsoleId("1", DateTime.UtcNow); + + _transaction = new Mock(); + } + + [Fact] + public void Ctor_ThrowsException_IfTransactionIsNull() + { + Assert.Throws("transaction", () => new ConsoleExpirationTransaction(null)); + } + + [Fact] + public void Dispose_ReallyDisposesTransaction() + { + var expiration = new ConsoleExpirationTransaction(_transaction.Object); + + expiration.Dispose(); + + _transaction.Verify(x => x.Dispose()); + } + + [Fact] + public void Expire_ThrowsException_IfConsoleIdIsNull() + { + var expiration = new ConsoleExpirationTransaction(_transaction.Object); + + Assert.Throws("consoleId", () => expiration.Expire(null, TimeSpan.FromHours(1))); + } + + [Fact] + public void Expire_ExpiresSetAndHash() + { + var expiration = new ConsoleExpirationTransaction(_transaction.Object); + + expiration.Expire(_consoleId, TimeSpan.FromHours(1)); + + _transaction.Verify(x => x.ExpireSet(_consoleId.GetSetKey(), It.IsAny())); + _transaction.Verify(x => x.ExpireHash(_consoleId.GetHashKey(), It.IsAny())); + + // backward compatibility: + _transaction.Verify(x => x.ExpireSet(_consoleId.GetOldConsoleKey(), It.IsAny())); + _transaction.Verify(x => x.ExpireHash(_consoleId.GetOldConsoleKey(), It.IsAny())); + } + + [Fact] + public void Persist_ThrowsException_IfConsoleIdIsNull() + { + var expiration = new ConsoleExpirationTransaction(_transaction.Object); + + Assert.Throws("consoleId", () => expiration.Persist(null)); + } + + [Fact] + public void Persist_PersistsSetAndHash() + { + var expiration = new ConsoleExpirationTransaction(_transaction.Object); + + expiration.Persist(_consoleId); + + _transaction.Verify(x => x.PersistSet(_consoleId.GetSetKey())); + _transaction.Verify(x => x.PersistHash(_consoleId.GetHashKey())); + + // backward compatibility: + _transaction.Verify(x => x.PersistSet(_consoleId.GetOldConsoleKey())); + _transaction.Verify(x => x.PersistHash(_consoleId.GetOldConsoleKey())); + } + + } +} diff --git a/tests/Hangfire.Console.Tests/Storage/ConsoleStorageFacts.cs b/tests/Hangfire.Console.Tests/Storage/ConsoleStorageFacts.cs index 59a4b81..68e8dd3 100644 --- a/tests/Hangfire.Console.Tests/Storage/ConsoleStorageFacts.cs +++ b/tests/Hangfire.Console.Tests/Storage/ConsoleStorageFacts.cs @@ -45,6 +45,24 @@ public void Dispose_ReallyDisposesConnection() _connection.Verify(x => x.Dispose()); } + [Fact] + public void InitConsole_ThrowsException_IfConsoleIdIsNull() + { + var storage = new ConsoleStorage(_connection.Object); + + Assert.Throws("consoleId", () => storage.InitConsole(null)); + } + + [Fact] + public void InitConsole_JobIdIsAddedToHash() + { + var storage = new ConsoleStorage(_connection.Object); + + storage.InitConsole(_consoleId); + + _connection.Verify(x => x.SetRangeInHash(_consoleId.GetHashKey(), It.IsAny>>())); + } + [Fact] public void AddLine_ThrowsException_IfConsoleIdIsNull() { @@ -78,8 +96,8 @@ public void AddLine_ShortLineIsAddedToSet() storage.AddLine(_consoleId, line); Assert.False(line.IsReference); - _transaction.Verify(x => x.AddToSet($"console:{_consoleId}", It.IsAny(), It.IsAny())); - _transaction.Verify(x => x.SetRangeInHash($"console:refs:{_consoleId}", It.IsAny>>()), Times.Never); + _transaction.Verify(x => x.AddToSet(_consoleId.GetSetKey(), It.IsAny(), It.IsAny())); + _transaction.Verify(x => x.SetRangeInHash(_consoleId.GetHashKey(), It.IsAny>>()), Times.Never); } [Fact] @@ -98,8 +116,8 @@ public void AddLine_LongLineIsAddedToHash_AndReferenceIsAddedToSet() storage.AddLine(_consoleId, line); Assert.True(line.IsReference); - _transaction.Verify(x => x.AddToSet($"console:{_consoleId}", It.IsAny(), It.IsAny())); - _transaction.Verify(x => x.SetRangeInHash($"console:refs:{_consoleId}", It.IsAny>>())); + _transaction.Verify(x => x.AddToSet(_consoleId.GetSetKey(), It.IsAny(), It.IsAny())); + _transaction.Verify(x => x.SetRangeInHash(_consoleId.GetHashKey(), It.IsAny>>())); } [Fact] @@ -117,19 +135,48 @@ public void Expire_ExpiresSetAndHash() storage.Expire(_consoleId, TimeSpan.FromHours(1)); - _transaction.Verify(x => x.ExpireSet($"console:{_consoleId}", It.IsAny())); - _transaction.Verify(x => x.ExpireHash($"console:refs:{_consoleId}", It.IsAny())); - } + _connection.Verify(x => x.CreateWriteTransaction()); + + _transaction.Verify(x => x.ExpireSet(_consoleId.GetSetKey(), It.IsAny())); + _transaction.Verify(x => x.ExpireHash(_consoleId.GetHashKey(), It.IsAny())); + _transaction.Verify(x => x.Commit()); + } + [Fact] public void Expire_ExpiresOldSetAndHashKeysEither_ForBackwardsCompatibility() { var storage = new ConsoleStorage(_connection.Object); - + storage.Expire(_consoleId, TimeSpan.FromHours(1)); - _transaction.Verify(x => x.ExpireSet(_consoleId.ToString(), It.IsAny())); - _transaction.Verify(x => x.ExpireHash(_consoleId.ToString(), It.IsAny())); + _connection.Verify(x => x.CreateWriteTransaction()); + + _transaction.Verify(x => x.ExpireSet(_consoleId.GetOldConsoleKey(), It.IsAny())); + _transaction.Verify(x => x.ExpireHash(_consoleId.GetOldConsoleKey(), It.IsAny())); + + _transaction.Verify(x => x.Commit()); + } + + [Fact] + public void GetConsoleTtl_ThrowsException_IfConsoleIdIsNull() + { + var storage = new ConsoleStorage(_connection.Object); + + Assert.Throws("consoleId", () => storage.GetConsoleTtl(null)); + } + + [Fact] + public void GetConsoleTtl_ReturnsTtlOfHash() + { + _connection.Setup(x => x.GetHashTtl(_consoleId.GetHashKey())) + .Returns(TimeSpan.FromSeconds(123)); + + var storage = new ConsoleStorage(_connection.Object); + + var ttl = storage.GetConsoleTtl(_consoleId); + + Assert.Equal(TimeSpan.FromSeconds(123), ttl); } [Fact] @@ -143,7 +190,7 @@ public void GetLineCount_ThrowsException_IfConsoleIdIsNull() [Fact] public void GetLineCount_ReturnsCountOfSet() { - _connection.Setup(x => x.GetSetCount($"console:{_consoleId}")) + _connection.Setup(x => x.GetSetCount(_consoleId.GetSetKey())) .Returns(123); var storage = new ConsoleStorage(_connection.Object); @@ -156,8 +203,8 @@ public void GetLineCount_ReturnsCountOfSet() [Fact] public void GetLineCount_ReturnsCountOfOldSet_WhenNewOneReturnsZero_ForBackwardsCompatibility() { - _connection.Setup(x => x.GetSetCount($"console:{_consoleId}")).Returns(0); - _connection.Setup(x => x.GetSetCount(_consoleId.ToString())).Returns(123); + _connection.Setup(x => x.GetSetCount(_consoleId.GetSetKey())).Returns(0); + _connection.Setup(x => x.GetSetCount(_consoleId.GetOldConsoleKey())).Returns(123); var storage = new ConsoleStorage(_connection.Object); @@ -184,7 +231,7 @@ public void GetLines_ReturnsRangeFromSet() new ConsoleLine { TimeOffset = 3, Message = "line4" }, }; - _connection.Setup(x => x.GetRangeFromSet($"console:{_consoleId}", It.IsAny(), It.IsAny())) + _connection.Setup(x => x.GetRangeFromSet(_consoleId.GetSetKey(), It.IsAny(), It.IsAny())) .Returns((string key, int start, int end) => lines.Where((x, i) => i >= start && i <= end).Select(JobHelper.ToJson).ToList()); var storage = new ConsoleStorage(_connection.Object); @@ -204,7 +251,7 @@ public void GetLines_ReturnsRangeFromOldSet_ForBackwardsCompatibility() new ConsoleLine { TimeOffset = 3, Message = "line4" }, }; - _connection.Setup(x => x.GetRangeFromSet(_consoleId.ToString(), It.IsAny(), It.IsAny())) + _connection.Setup(x => x.GetRangeFromSet(_consoleId.GetOldConsoleKey(), It.IsAny(), It.IsAny())) .Returns((string key, int start, int end) => lines.Where((x, i) => i >= start && i <= end).Select(JobHelper.ToJson).ToList()); var storage = new ConsoleStorage(_connection.Object); @@ -221,9 +268,9 @@ public void GetLines_ExpandsReferences() new ConsoleLine { TimeOffset = 0, Message = "line1", IsReference = true } }; - _connection.Setup(x => x.GetRangeFromSet($"console:{_consoleId}", It.IsAny(), It.IsAny())) + _connection.Setup(x => x.GetRangeFromSet(_consoleId.GetSetKey(), It.IsAny(), It.IsAny())) .Returns((string key, int start, int end) => lines.Where((x, i) => i >= start && i <= end).Select(JobHelper.ToJson).ToList()); - _connection.Setup(x => x.GetValueFromHash($"console:refs:{_consoleId}", It.IsAny())) + _connection.Setup(x => x.GetValueFromHash(_consoleId.GetHashKey(), It.IsAny())) .Returns("Dereferenced Line"); var storage = new ConsoleStorage(_connection.Object); @@ -241,9 +288,9 @@ public void GetLines_ExpandsReferencesFromOldHash_ForBackwardsCompatibility() new ConsoleLine { TimeOffset = 0, Message = "line1", IsReference = true } }; - _connection.Setup(x => x.GetRangeFromSet(_consoleId.ToString(), It.IsAny(), It.IsAny())) + _connection.Setup(x => x.GetRangeFromSet(_consoleId.GetOldConsoleKey(), It.IsAny(), It.IsAny())) .Returns((string key, int start, int end) => lines.Where((x, i) => i >= start && i <= end).Select(JobHelper.ToJson).ToList()); - _connection.Setup(x => x.GetValueFromHash(_consoleId.ToString(), It.IsAny())) + _connection.Setup(x => x.GetValueFromHash(_consoleId.GetOldConsoleKey(), It.IsAny())) .Returns("Dereferenced Line"); var storage = new ConsoleStorage(_connection.Object); @@ -261,10 +308,10 @@ public void GetLines_HandlesHashException_WhenTryingToExpandReferences() new ConsoleLine { TimeOffset = 0, Message = "line1", IsReference = true } }; - _connection.Setup(x => x.GetRangeFromSet(_consoleId.ToString(), It.IsAny(), It.IsAny())) + _connection.Setup(x => x.GetRangeFromSet(_consoleId.GetOldConsoleKey(), It.IsAny(), It.IsAny())) .Returns((string key, int start, int end) => lines.Where((x, i) => i >= start && i <= end).Select(JobHelper.ToJson).ToList()); - _connection.Setup(x => x.GetValueFromHash(_consoleId.ToString(), It.IsAny())) + _connection.Setup(x => x.GetValueFromHash(_consoleId.GetOldConsoleKey(), It.IsAny())) .Throws(new NotSupportedException()); var storage = new ConsoleStorage(_connection.Object);