Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configured the EventGrid Reaction to use different schemas #138

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cli/service/resources/default-reaction-providers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ spec:
type: string
eventGridKey:
type: string
eventGridSchema:
type: string
enum:
- "CloudEvents"
- "EventGrid"
default: "CloudEvents"
format:
type: string
enum:
- "packed"
- "unpacked"
default: "packed"
required:
- eventGridUri
---
Expand Down
1 change: 0 additions & 1 deletion reactions/azure/eventgrid-reaction/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
var configuration = sp.GetRequiredService<IConfiguration>();
var logger = sp.GetRequiredService<ILogger<EventGridPublisherClient>>();
var eventGridUri = configuration.GetValue<string>("eventGridUri");
// var eventGridKey = configuration.GetValue<string>("eventGridKey");

EventGridPublisherClient publisherClient;
switch (configuration.GetIdentityType())
Expand Down
73 changes: 55 additions & 18 deletions reactions/azure/eventgrid-reaction/Services/ChangeHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,24 @@ namespace Drasi.Reactions.EventGrid.Services;
// using CloudNative.CloudEvents;




public class ChangeHandler : IChangeEventHandler
{
private readonly EventGridPublisherClient _publisherClient;
private readonly OutputFormat _format;
private readonly IChangeFormatter _formatter;
private readonly ILogger<ChangeHandler> _logger;

private readonly EventGridSchema _eventGridSchema;

public ChangeHandler(EventGridPublisherClient publisherClient,IConfiguration config, IChangeFormatter formatter, ILogger<ChangeHandler> logger)
{
_publisherClient = publisherClient;
_format = Enum.Parse<OutputFormat>(config.GetValue("format", "packed") ?? "packed", true);
_formatter = formatter;
_logger = logger;
_eventGridSchema = Enum.Parse<EventGridSchema>(config.GetValue<string>("eventGridSchema"));
}

public async Task HandleChange(ChangeEvent evt, object? queryConfig)
Expand All @@ -47,27 +52,53 @@ public async Task HandleChange(ChangeEvent evt, object? queryConfig)
switch(_format)
{
case OutputFormat.Packed:
CloudEvent egEvent = new CloudEvent(evt.QueryId, "Drasi.ChangeEvent", _formatter.Format(evt));
var resp = await _publisherClient.SendEventAsync(egEvent);
if (resp.IsError)
{
_logger.LogError($"Error sending message to Event Grid: {resp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {resp.Content.ToString()}");
if (_eventGridSchema == EventGridSchema.CloudEvents) {
CloudEvent cloudEvent = new CloudEvent(evt.QueryId, "Drasi.ChangeEvent", _formatter.Format(evt));
var resp = await _publisherClient.SendEventAsync(cloudEvent);
if (resp.IsError)
{
_logger.LogError($"Error sending message to Event Grid: {resp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {resp.Content.ToString()}");
}
} else if (_eventGridSchema == EventGridSchema.EventGrid) {
EventGridEvent egEvent = new EventGridEvent(evt.QueryId, "Drasi.ChangeEvent", "1", _formatter.Format(evt));
var resp = await _publisherClient.SendEventAsync(egEvent);
if (resp.IsError)
{
_logger.LogError($"Error sending message to Event Grid: {resp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {resp.Content.ToString()}");
}
}
break;

case OutputFormat.Unpacked:
var formattedResults = _formatter.Format(evt);
List<CloudEvent> events = new List<CloudEvent>();
foreach (var notification in formattedResults)
{
CloudEvent currEvent = new CloudEvent(evt.QueryId, "Drasi.ChangeEvent", notification);
events.Add(currEvent);
}
var currResp = await _publisherClient.SendEventsAsync(events);
if (currResp.IsError)
{
_logger.LogError($"Error sending message to Event Grid: {currResp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {currResp.Content.ToString()}");
if (_eventGridSchema == EventGridSchema.EventGrid) {
List<EventGridEvent> events = new List<EventGridEvent>();
foreach (var notification in formattedResults)
{
EventGridEvent currEvent = new EventGridEvent(evt.QueryId, "Drasi.ChangeEvent", "1", notification);
events.Add(currEvent);
}
var currResp = await _publisherClient.SendEventsAsync(events);
if (currResp.IsError)
{
_logger.LogError($"Error sending message to Event Grid: {currResp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {currResp.Content.ToString()}");
}
} else if (_eventGridSchema == EventGridSchema.CloudEvents) {
List<CloudEvent> events = new List<CloudEvent>();
foreach (var notification in formattedResults)
{
CloudEvent currEvent = new CloudEvent(evt.QueryId, "Drasi.ChangeEvent", notification);
events.Add(currEvent);
}
var currResp = await _publisherClient.SendEventsAsync(events);
if (currResp.IsError)
{
_logger.LogError($"Error sending message to Event Grid: {currResp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {currResp.Content.ToString()}");
}
}

break;
Expand All @@ -82,4 +113,10 @@ enum OutputFormat
{
Packed,
Unpacked
}
}

enum EventGridSchema
{
CloudEvents,
EventGrid
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,41 @@ public class ControlSignalHandler: IControlEventHandler

private readonly ILogger<ControlSignalHandler> _logger;

private readonly EventGridSchema _eventGridSchema;

public ControlSignalHandler(EventGridPublisherClient publisherClient, IConfiguration config, ILogger<ControlSignalHandler> logger)
{
_publisherClient = publisherClient;
_format = Enum.Parse<OutputFormat>(config.GetValue("format", "packed") ?? "packed", true);
_logger = logger;
_eventGridSchema = Enum.Parse<EventGridSchema>(config.GetValue<string>("eventGridSchema") ?? "CloudEvents", true);
}

public async Task HandleControlSignal(ControlEvent evt, object? queryConfig)
{
switch (_format)
{
case OutputFormat.Packed:
CloudEvent egEvent = new CloudEvent(evt.QueryId, "Drasi.ControlEvent", evt);
var resp = await _publisherClient.SendEventAsync(egEvent);
if (resp.IsError)
if (_eventGridSchema == EventGridSchema.CloudEvents)
{
CloudEvent egEvent = new CloudEvent(evt.QueryId, "Drasi.ControlEvent", evt);
var resp = await _publisherClient.SendEventAsync(egEvent);
if (resp.IsError)
{
_logger.LogError($"Error sending message to Event Grid: {resp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {resp.Content.ToString()}");
}
break;
} else if (_eventGridSchema == EventGridSchema.EventGrid)
{
_logger.LogError($"Error sending message to Event Grid: {resp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {resp.Content.ToString()}");
EventGridEvent egEvent = new EventGridEvent(evt.QueryId, "Drasi.ControlEvent", "1", evt);
var resp = await _publisherClient.SendEventAsync(egEvent);
if (resp.IsError)
{
_logger.LogError($"Error sending message to Event Grid: {resp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {resp.Content.ToString()}");
}
break;
}
break;
case OutputFormat.Unpacked:
Expand All @@ -67,12 +84,26 @@ public async Task HandleControlSignal(ControlEvent evt, object? queryConfig)
}
};

CloudEvent unpackedEvent = new CloudEvent(evt.QueryId, "Drasi.ControlSignal", notification);
var dzresp = await _publisherClient.SendEventAsync(unpackedEvent);
if (dzresp.IsError)
if (_eventGridSchema == EventGridSchema.EventGrid)
{
_logger.LogError($"Error sending message to Event Grid: {dzresp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {dzresp.Content.ToString()}");
EventGridEvent egEvent = new EventGridEvent(evt.QueryId, "Drasi.ControlSignal", "1", notification);
var resp = await _publisherClient.SendEventAsync(egEvent);
if (resp.IsError)
{
_logger.LogError($"Error sending message to Event Grid: {resp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {resp.Content.ToString()}");
}
break;
} else if (_eventGridSchema == EventGridSchema.CloudEvents)
{
CloudEvent unpackedEvent = new CloudEvent(evt.QueryId, "Drasi.ControlSignal", notification);
var dzresp = await _publisherClient.SendEventAsync(unpackedEvent);
if (dzresp.IsError)
{
_logger.LogError($"Error sending message to Event Grid: {dzresp.Content.ToString()}");
throw new Exception($"Error sending message to Event Grid: {dzresp.Content.ToString()}");
}
break;
}
break;
default:
Expand Down
Loading