Skip to content

Commit

Permalink
Handling of ResourceLimitExceeded in Notfirications Email implemented…
Browse files Browse the repository at this point in the history
… #GCPActive (#313)

Co-authored-by: Terje Holene <[email protected]>
  • Loading branch information
acn-sbuad and SandGrainOne authored Dec 4, 2023
1 parent c755a60 commit c71a508
Show file tree
Hide file tree
Showing 27 changed files with 698 additions and 54 deletions.
18 changes: 18 additions & 0 deletions src/Altinn.Notifications.Core/Enums/AltinnServiceUpdateSchema.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace Altinn.Notifications.Core.Enums;

/// <summary>
/// Enum describing the various Altinn Service update schemas
/// </summary>
public enum AltinnServiceUpdateSchema
{
/// <summary>
/// Default value for unknown schema
/// </summary>
Unkown,

/// <summary>
/// The resource limit exceeded schema
/// </summary>
/// <remarks>Data of this schema should be mapped to a <see cref="ResourceLimitExceeded"/> object</remarks>
ResourceLimitExceeded
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public static void AddCoreServices(this IServiceCollection services, IConfigurat
.AddSingleton<IEmailNotificationOrderService, EmailNotificationOrderService>()
.AddSingleton<INotificationSummaryService, NotificationSummaryService>()
.AddSingleton<IEmailNotificationService, EmailNotificationService>()
.AddSingleton<IAltinnServiceUpdateService, AltinnServiceUpdateService>()
.AddSingleton<INotificationsEmailServiceUpdateService, NotificationsEmailServiceUpdateService>()
.Configure<KafkaSettings>(config.GetSection("KafkaSettings"))
.Configure<NotificationOrderConfig>(config.GetSection("NotificationOrderConfig"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System.Text.Json;
using System.Text.Json.Serialization;

using Altinn.Notifications.Core.Enums;

namespace Altinn.Notifications.Core.Models.AltinnServiceUpdate
{
/// <summary>
/// A class representing a generic service update
/// </summary>
public class GenericServiceUpdate
{
private static readonly JsonSerializerOptions _serializerOptions = new JsonSerializerOptions()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingDefault,
WriteIndented = true,
Converters = { new JsonStringEnumConverter() },
PropertyNameCaseInsensitive = true
};

/// <summary>
/// The source of the service update
/// </summary>
public string Source { get; set; } = string.Empty;

/// <summary>
/// The schema of the service update data
/// </summary>
public AltinnServiceUpdateSchema Schema { get; set; }

/// <summary>
/// The data of the service update as a json serialized string
/// </summary>
public string Data { get; set; } = string.Empty;

/// <summary>
/// Serialize the <see cref="GenericServiceUpdate"/> into a json string
/// </summary>
/// <returns></returns>
public string Serialize()
{
return JsonSerializer.Serialize(this, _serializerOptions);
}

/// <summary>
/// Try to parse a json string into a<see cref="GenericServiceUpdate"/>
/// </summary>
public static bool TryParse(string input, out GenericServiceUpdate value)
{
GenericServiceUpdate? parsedOutput;
value = new GenericServiceUpdate();

if (string.IsNullOrEmpty(input))
{
return false;
}

try
{
parsedOutput = JsonSerializer.Deserialize<GenericServiceUpdate>(input!, _serializerOptions);

value = parsedOutput!;
return !string.IsNullOrEmpty(value.Source);
}
catch
{
// try parse, we simply return false if fails
}

return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Altinn.Notifications.Core.Models.AltinnServiceUpdate
{
/// <summary>
/// A class holding data on an exceeded resource limit in an Altinn service
/// </summary>
public class ResourceLimitExceeded
{
private static readonly JsonSerializerOptions _serializerOptions = new JsonSerializerOptions()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingDefault,
WriteIndented = true,
Converters = { new JsonStringEnumConverter() },
PropertyNameCaseInsensitive = true
};

/// <summary>
/// The resource that has reached its capacity limit
/// </summary>
public string Resource { get; set; } = string.Empty;

/// <summary>
/// The timestamp for when the service is available again
/// </summary>
public DateTime ResetTime { get; set; }

/// <summary>
/// Serialize the <see cref="ResourceLimitExceeded"/> into a json string
/// </summary>
/// <returns></returns>
public string Serialize()
{
return JsonSerializer.Serialize(this, _serializerOptions);
}

/// <summary>
/// Try to parse a json string into a<see cref="ResourceLimitExceeded"/>
/// </summary>
public static bool Tryparse(string input, out ResourceLimitExceeded value)
{
ResourceLimitExceeded? parsedOutput;
value = new ResourceLimitExceeded();

if (string.IsNullOrEmpty(input))
{
return false;
}

try
{
parsedOutput = JsonSerializer.Deserialize<ResourceLimitExceeded>(input!, _serializerOptions);

value = parsedOutput!;
return !string.IsNullOrEmpty(value.Resource);
}
catch
{
// try parse, we simply return false if fails
}

return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Altinn.Notifications.Core.Repository.Interfaces
{
/// <summary>
/// Interface for handling actions towards the resource limits table
/// </summary>
public interface IResourceLimitRepository
{
/// <summary>
/// Sets the timeout flag for a given resource
/// </summary>
/// <param name="timeout">The date time for when the resource limit is reset</param>
/// <returns>A boolean indicating if the operation was successful</returns>
Task<bool> SetEmailTimeout(DateTime timeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Altinn.Notifications.Core.Enums;
using Altinn.Notifications.Core.Services.Interfaces;

using Microsoft.Extensions.Logging;

namespace Altinn.Notifications.Core.Services
{
/// <summary>
/// Implementation of the <see cref="IAltinnServiceUpdateService"/> interface
/// </summary>
public class AltinnServiceUpdateService : IAltinnServiceUpdateService
{
private readonly INotificationsEmailServiceUpdateService _notificationsEmail;
private readonly ILogger<AltinnServiceUpdateService> _logger;

/// <summary>
/// Initializes a new instance of the <see cref="AltinnServiceUpdateService"/> class.
/// </summary>
public AltinnServiceUpdateService(
INotificationsEmailServiceUpdateService notificationsEmail,
ILogger<AltinnServiceUpdateService> logger)
{
_notificationsEmail = notificationsEmail;
_logger = logger;
}

/// <inheritdoc/>
public async Task HandleServiceUpdate(string source, AltinnServiceUpdateSchema schema, string serializedData)
{
switch (source)
{
case "platform-notifications-email":
await _notificationsEmail.HandleServiceUpdate(schema, serializedData);
return;
default:
_logger.LogInformation("// AltinnServiceUpdateService // HandleServiceUpdate// Received update from unknown service {service}.", source);
return;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Altinn.Notifications.Core.Enums;

namespace Altinn.Notifications.Core.Services.Interfaces
{
/// <summary>
/// Interface describing the service responding to service updates from Altinn components
/// </summary>
public interface IAltinnServiceUpdateService
{
/// <summary>
/// Method for handling an update from a service
/// </summary>
public Task HandleServiceUpdate(string source, AltinnServiceUpdateSchema schema, string serializedData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Altinn.Notifications.Core.Enums;

namespace Altinn.Notifications.Core.Services.Interfaces
{
/// <summary>
/// Interface describing the service responding to service updates from the Notifications Email component
/// </summary>
public interface INotificationsEmailServiceUpdateService
{
/// <summary>
/// Method for handling an incoming service update
/// </summary>
public Task HandleServiceUpdate(AltinnServiceUpdateSchema schema, string serializedData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System.Text.Json;

using Altinn.Notifications.Core.Enums;
using Altinn.Notifications.Core.Models.AltinnServiceUpdate;
using Altinn.Notifications.Core.Repository.Interfaces;
using Altinn.Notifications.Core.Services.Interfaces;

using Microsoft.Extensions.Logging;

namespace Altinn.Notifications.Core.Services
{
/// <summary>
/// Implementation of the <see cref="INotificationsEmailServiceUpdateService"/> interface
/// </summary>
public class NotificationsEmailServiceUpdateService : INotificationsEmailServiceUpdateService
{
private readonly IResourceLimitRepository _resourceLimitRepository;
private readonly ILogger<NotificationsEmailServiceUpdateService> _logger;

/// <summary>
/// Initializes a new instance of the <see cref="NotificationsEmailServiceUpdateService"/> class.
/// </summary>
public NotificationsEmailServiceUpdateService(
IResourceLimitRepository resourceLimitRepository,
ILogger<NotificationsEmailServiceUpdateService> logger)
{
_resourceLimitRepository = resourceLimitRepository;
_logger = logger;
}

/// <inheritdoc/>
public async Task HandleServiceUpdate(AltinnServiceUpdateSchema schema, string serializedData)
{
switch (schema)
{
case AltinnServiceUpdateSchema.ResourceLimitExceeded:
bool success = ResourceLimitExceeded.Tryparse(serializedData, out ResourceLimitExceeded update);

if (!success)
{
_logger.LogError("// NotificationsEmailServiceUpdateService // HandleServiceUpdate // Failed to parse message {message} into schema {schema}", serializedData, schema);
return;
}

await HandleResourceLimitExceeded(update);
return;
}
}

private async Task HandleResourceLimitExceeded(ResourceLimitExceeded update)
{
await _resourceLimitRepository.SetEmailTimeout(update.ResetTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public class KafkaSettings
/// </summary>
public string EmailStatusUpdatedTopicName { get; set; } = string.Empty;

/// <summary>
/// The name of the platform service update topic
/// </summary>
public string AltinnServiceUpdateTopicName { get; set; } = string.Empty;

/// <summary>
/// The name of the health check topic
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public static void AddKafkaServices(this IServiceCollection services, IConfigura
.AddHostedService<PastDueOrdersConsumer>()
.AddHostedService<PastDueOrdersRetryConsumer>()
.AddHostedService<EmailStatusConsumer>()
.AddHostedService<AltinnServiceUpdateConsumer>()
.Configure<KafkaSettings>(config.GetSection(nameof(KafkaSettings)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using Altinn.Notifications.Core.Models.AltinnServiceUpdate;
using Altinn.Notifications.Core.Services.Interfaces;
using Altinn.Notifications.Integrations.Configuration;

using Microsoft.Extensions.Logging;

using Microsoft.Extensions.Options;

namespace Altinn.Notifications.Integrations.Kafka.Consumers
{
/// <summary>
/// Kafka consumer class for Altinn service updates
/// </summary>
public class AltinnServiceUpdateConsumer : KafkaConsumerBase<AltinnServiceUpdateConsumer>
{
private readonly IAltinnServiceUpdateService _serviceUpdate;
private readonly ILogger<AltinnServiceUpdateConsumer> _logger;

/// <summary>
/// Initializes a new instance of the <see cref="AltinnServiceUpdateConsumer"/> class.
/// </summary>
public AltinnServiceUpdateConsumer(
IAltinnServiceUpdateService serviceUpdate,
IOptions<KafkaSettings> settings,
ILogger<AltinnServiceUpdateConsumer> logger)
: base(settings, logger, settings.Value.AltinnServiceUpdateTopicName)
{
_serviceUpdate = serviceUpdate;
_logger = logger;
}

/// <inheritdoc/>
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.Run(() => ConsumeMessage(ProcessServiceUpdate, RetryServiceUpdate, stoppingToken), stoppingToken);
}

private async Task ProcessServiceUpdate(string message)
{
bool succeeded = GenericServiceUpdate.TryParse(message, out GenericServiceUpdate update);

if (!succeeded)
{
_logger.LogError("// AltinnServiceUpdateConsumer // ProcessServiceUpdate // Deserialization of message failed. {Message}", message);
return;
}

await _serviceUpdate.HandleServiceUpdate(update.Source.ToLower().Trim(), update.Schema, update.Data);
}

private async Task RetryServiceUpdate(string message)
{
// Making a second attempt, but no further action if it fails again.
await ProcessServiceUpdate(message);
}
}
}
Loading

0 comments on commit c71a508

Please sign in to comment.