Skip to content

Commit

Permalink
1. 完善日志记录
Browse files Browse the repository at this point in the history
2. 调整DomainEventStream的ID的生成方式
3. 校验EventCommittingContextMailBox.EqueueMessage时EventStream.Id是否重复
  • Loading branch information
tangxuehua committed Aug 5, 2020
1 parent 38ea729 commit a1d3efb
Show file tree
Hide file tree
Showing 21 changed files with 113 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ public ApplicationMessageConsumer Shutdown()

void IQueueMessageHandler.Handle(QueueMessage queueMessage, IMessageContext context)
{
var applicationMessageString = Encoding.UTF8.GetString(queueMessage.Body);

_logger.InfoFormat("Received application message equeue message: {0}, applicationMessage: {1}", queueMessage, applicationMessageString);

var applicationMessageType = _typeNameProvider.GetType(queueMessage.Tag);
var message = _jsonSerializer.Deserialize(Encoding.UTF8.GetString(queueMessage.Body), applicationMessageType) as IApplicationMessage;
_logger.InfoFormat("ENode application message received, messageId: {0}, messageType: {1}", message.Id, message.GetType().Name);
var message = _jsonSerializer.Deserialize(applicationMessageString, applicationMessageType) as IApplicationMessage;

_messageDispatcher.DispatchMessageAsync(message).ContinueWith(x =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,16 @@ public ApplicationMessagePublisher Shutdown()
return this;
}
public Task PublishAsync(IApplicationMessage message)
{
var queueMessage = CreateEQueueMessage(message);
return _sendMessageService.SendMessageAsync(Producer, "applicationMessage", message.GetType().Name, queueMessage, message.Id, message.Id, message.Items);
}

private EQueueMessage CreateEQueueMessage(IApplicationMessage message)
{
var topic = _messageTopicProvider.GetTopic(message);
var data = _jsonSerializer.Serialize(message);
return new EQueueMessage(
var equeueMessage = new EQueueMessage(
topic,
(int)EQueueMessageTypeCode.ApplicationMessage,
Encoding.UTF8.GetBytes(data),
_typeNameProvider.GetTypeName(message.GetType()));

return _sendMessageService.SendMessageAsync(Producer, "applicationMessage", message.GetType().Name, equeueMessage, data, message.Id, message.Id, message.Items);
}
}
}
7 changes: 5 additions & 2 deletions src/ENode.EQueue/Command/CommandConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,16 @@ public CommandConsumer Shutdown()

void IQueueMessageHandler.Handle(QueueMessage queueMessage, IMessageContext context)
{
var commandMessageString = Encoding.UTF8.GetString(queueMessage.Body);

_logger.InfoFormat("Received command equeue message: {0}, commandMessage: {1}", queueMessage, commandMessageString);

var commandItems = new Dictionary<string, string>();
var commandMessage = _jsonSerializer.Deserialize<CommandMessage>(Encoding.UTF8.GetString(queueMessage.Body));
var commandMessage = _jsonSerializer.Deserialize<CommandMessage>(commandMessageString);
var commandType = _typeNameProvider.GetType(queueMessage.Tag);
var command = _jsonSerializer.Deserialize(commandMessage.CommandData, commandType) as ICommand;
var commandExecuteContext = new CommandExecuteContext(_repository, _aggregateStorage, queueMessage, context, commandMessage, _sendReplyService);
commandItems["CommandReplyAddress"] = commandMessage.ReplyAddress;
_logger.InfoFormat("ENode command message received, messageId: {0}, commandType: {1}, aggregateRootId: {2}", command.Id, command.GetType().Name, command.AggregateRootId);
_commandProcessor.Process(new ProcessingCommand(command, commandExecuteContext, commandItems));
}

Expand Down
9 changes: 6 additions & 3 deletions src/ENode.EQueue/Command/CommandService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public CommandService Shutdown()
}
public Task SendAsync(ICommand command)
{
return _sendMessageService.SendMessageAsync(Producer, "command", command.GetType().Name, BuildCommandMessage(command, false), command.AggregateRootId, command.Id, command.Items);
var equeueMessage = BuildCommandMessage(command, out string messageBody, false);
return _sendMessageService.SendMessageAsync(Producer, "command", command.GetType().Name, equeueMessage, messageBody, command.AggregateRootId, command.Id, command.Items);
}
public Task<CommandResult> ExecuteAsync(ICommand command)
{
Expand All @@ -79,7 +80,8 @@ public async Task<CommandResult> ExecuteAsync(ICommand command, CommandReturnTyp

try
{
await _sendMessageService.SendMessageAsync(Producer, "command", command.GetType().Name, BuildCommandMessage(command, true), command.AggregateRootId, command.Id, command.Items).ConfigureAwait(false);
var equeueMessage = BuildCommandMessage(command, out string messageBody, true);
await _sendMessageService.SendMessageAsync(Producer, "command", command.GetType().Name, equeueMessage, messageBody, command.AggregateRootId, command.Id, command.Items).ConfigureAwait(false);
}
catch
{
Expand All @@ -90,7 +92,7 @@ public async Task<CommandResult> ExecuteAsync(ICommand command, CommandReturnTyp
return await taskCompletionSource.Task.ConfigureAwait(false);
}

private EQueueMessage BuildCommandMessage(ICommand command, bool needReply = false)
private EQueueMessage BuildCommandMessage(ICommand command, out string messageBody, bool needReply = false)
{
Ensure.NotNull(command.AggregateRootId, "aggregateRootId");
var commandData = _jsonSerializer.Serialize(command);
Expand All @@ -101,6 +103,7 @@ private EQueueMessage BuildCommandMessage(ICommand command, bool needReply = fal
CommandData = commandData,
ReplyAddress = replyAddress
});
messageBody = messageData;
return new EQueueMessage(
topic,
(int)EQueueMessageTypeCode.CommandMessage,
Expand Down
8 changes: 6 additions & 2 deletions src/ENode.EQueue/DomainEvent/DomainEventConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,15 @@ public DomainEventConsumer Shutdown()

void IQueueMessageHandler.Handle(QueueMessage queueMessage, IMessageContext context)
{
var message = _jsonSerializer.Deserialize<EventStreamMessage>(Encoding.UTF8.GetString(queueMessage.Body));
var eventStreamMessageString = Encoding.UTF8.GetString(queueMessage.Body);

_logger.InfoFormat("Received event stream equeue message: {0}, eventStreamMessage: {1}", queueMessage, eventStreamMessageString);

var message = _jsonSerializer.Deserialize<EventStreamMessage>(eventStreamMessageString);
var domainEventStreamMessage = ConvertToDomainEventStream(message);
var processContext = new DomainEventStreamProcessContext(this, domainEventStreamMessage, queueMessage, context);
var processingMessage = new ProcessingEvent(domainEventStreamMessage, processContext);
_logger.InfoFormat("ENode event stream message received, messageId: {0}, aggregateRootId: {1}, aggregateRootType: {2}, version: {3}, evnts: {4}", domainEventStreamMessage.Id, domainEventStreamMessage.AggregateRootId, domainEventStreamMessage.AggregateRootTypeName, domainEventStreamMessage.Version, _jsonSerializer.Serialize(domainEventStreamMessage.Events.Select(x => x.GetType().Name)));

_messageProcessor.Process(processingMessage);
}

Expand Down
11 changes: 4 additions & 7 deletions src/ENode.EQueue/DomainEvent/DomainEventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,16 @@ public DomainEventPublisher Shutdown()
return this;
}
public Task PublishAsync(DomainEventStreamMessage eventStream)
{
var message = CreateEQueueMessage(eventStream);
return _sendMessageService.SendMessageAsync(Producer, "events", string.Join(",", eventStream.Events.Select(x => x.GetType().Name)), message, eventStream.AggregateRootId, eventStream.Id, eventStream.Items);
}

private EQueueMessage CreateEQueueMessage(DomainEventStreamMessage eventStream)
{
Ensure.NotNull(eventStream.AggregateRootId, "aggregateRootId");
var eventMessage = CreateEventMessage(eventStream);
var topic = _eventTopicProvider.GetTopic(eventStream.Events.First());
var data = _jsonSerializer.Serialize(eventMessage);
return new EQueueMessage(topic, (int)EQueueMessageTypeCode.DomainEventStreamMessage, Encoding.UTF8.GetBytes(data));
var equeueMessage = new EQueueMessage(topic, (int)EQueueMessageTypeCode.DomainEventStreamMessage, Encoding.UTF8.GetBytes(data));

return _sendMessageService.SendMessageAsync(Producer, "events", string.Join(",", eventStream.Events.Select(x => x.GetType().Name)), equeueMessage, data, eventStream.AggregateRootId, eventStream.Id, eventStream.Items);
}

private EventStreamMessage CreateEventMessage(DomainEventStreamMessage eventStream)
{
var message = new EventStreamMessage
Expand Down
9 changes: 5 additions & 4 deletions src/ENode.EQueue/DomainException/DomainExceptionConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,16 @@ public DomainExceptionConsumer Shutdown()
void IQueueMessageHandler.Handle(QueueMessage queueMessage, IMessageContext context)
{
var exceptionType = _typeNameProvider.GetType(queueMessage.Tag);
var exceptionMessage = _jsonSerializer.Deserialize<DomainExceptionMessage>(Encoding.UTF8.GetString(queueMessage.Body));
var domainExceptionString = Encoding.UTF8.GetString(queueMessage.Body);

_logger.InfoFormat("Received domain exception equeue message: {0}, domainExceptionMessage: {1}", queueMessage, domainExceptionString);

var exceptionMessage = _jsonSerializer.Deserialize<DomainExceptionMessage>(domainExceptionString);
var exception = FormatterServices.GetUninitializedObject(exceptionType) as IDomainException;
exception.Id = exceptionMessage.UniqueId;
exception.Timestamp = exceptionMessage.Timestamp;
exception.Items = exceptionMessage.Items;
exception.RestoreFrom(exceptionMessage.SerializableInfo);
_logger.InfoFormat("ENode domain exception message received, messageId: {0}, exceptionType: {1}",
exceptionMessage.UniqueId,
exceptionType.Name);

_messageDispatcher.DispatchMessageAsync(exception).ContinueWith(x =>
{
Expand Down
10 changes: 3 additions & 7 deletions src/ENode.EQueue/DomainException/DomainExceptionPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ public DomainExceptionPublisher Shutdown()
return this;
}
public Task PublishAsync(IDomainException exception)
{
var message = CreateEQueueMessage(exception);
return _sendMessageService.SendMessageAsync(Producer, "exception", exception.GetType().Name, message, exception.Id, exception.Id, exception.Items);
}

private EQueueMessage CreateEQueueMessage(IDomainException exception)
{
var topic = _exceptionTopicProvider.GetTopic(exception);
var serializableInfo = new Dictionary<string, string>();
Expand All @@ -63,11 +57,13 @@ private EQueueMessage CreateEQueueMessage(IDomainException exception)
Items = exception.Items,
SerializableInfo = serializableInfo
});
return new EQueueMessage(
var equeueMessage = new EQueueMessage(
topic,
(int)EQueueMessageTypeCode.ExceptionMessage,
Encoding.UTF8.GetBytes(data),
_typeNameProvider.GetTypeName(exception.GetType()));

return _sendMessageService.SendMessageAsync(Producer, "exception", exception.GetType().Name, equeueMessage, data, exception.Id, exception.Id, exception.Items);
}
}
}
14 changes: 9 additions & 5 deletions src/ENode.EQueue/SendQueueMessageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ public SendQueueMessageService()
_jsonSerializer = ObjectContainer.Resolve<IJsonSerializer>();
}

public async Task SendMessageAsync(Producer producer, string messageType, string messageClass, EQueueMessage message, string routingKey, string messageId, IDictionary<string, string> messageExtensionItems)
public async Task SendMessageAsync(Producer producer, string messageType, string messageClass, EQueueMessage message, string messageBodyString, string routingKey, string messageId, IDictionary<string, string> messageExtensionItems)
{
try
{
var result = await producer.SendAsync(message, routingKey).ConfigureAwait(false);
if (result.SendStatus == SendStatus.Success)
{
_logger.InfoFormat("ENode {0} message send success, equeueMessageId: {1}, routingKey: {2}, messageType: {3}, messageId: {4}, messageExtensionItems: {5}",
_logger.InfoFormat("ENode {0} message send success, equeueMessageId: {1}, message: {2}, messageBody: {3}, routingKey: {4}, messageType: {5}, messageId: {6}, messageExtensionItems: {7}",
messageType,
result.MessageStoreResult.MessageId,
message,
messageBodyString,
routingKey,
messageClass,
messageId,
Expand All @@ -41,9 +43,10 @@ public async Task SendMessageAsync(Producer producer, string messageType, string
}
else
{
_logger.ErrorFormat("ENode {0} message send failed, message: {1}, sendResult: {2}, routingKey: {3}, messageType: {4}, messageId: {5}, messageExtensionItems: {6}",
_logger.ErrorFormat("ENode {0} message send failed, message: {1}, messageBody: {2}, sendResult: {3}, routingKey: {4}, messageType: {5}, messageId: {6}, messageExtensionItems: {7}",
messageType,
message,
messageBodyString,
result,
routingKey,
messageClass,
Expand All @@ -55,9 +58,10 @@ public async Task SendMessageAsync(Producer producer, string messageType, string
}
catch (Exception ex)
{
_logger.Error(string.Format("ENode {0} message send has exception, message: {1}, routingKey: {2}, messageType: {3}, messageId: {4}, messageExtensionItems: {5}",
_logger.Error(string.Format("ENode {0} message send has exception, message: {1}, messageBody: {2}, routingKey: {3}, messageType: {4}, messageId: {5}, messageExtensionItems: {6}",
messageType,
message,
message,
messageBodyString,
routingKey,
messageClass,
messageId,
Expand Down
2 changes: 1 addition & 1 deletion src/ENode.MySQL/MySqlTableGenerateSQL.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ CREATE TABLE `PublishedVersion` (
`CreatedOn` datetime NOT NULL,
`UpdatedOn` datetime NOT NULL,
PRIMARY KEY (`Sequence`),
UNIQUE KEY `IX_PublishedVersion_AggId_Version` (`ProcessorName`,`AggregateRootId`,`Version`)
UNIQUE KEY `IX_PublishedVersion_AggId_Version` (`ProcessorName`,`AggregateRootId`)
) ENGINE=InnoDB AUTO_INCREMENT=18 DEFAULT CHARSET=utf8;

CREATE TABLE `LockKey` (
Expand Down
9 changes: 6 additions & 3 deletions src/ENode/Commanding/Impl/DefaultCommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,28 @@
using System.Threading;
using ECommon.Logging;
using ECommon.Scheduling;
using ECommon.Serializing;
using ENode.Configurations;

namespace ENode.Commanding.Impl
{
public class DefaultCommandProcessor : ICommandProcessor
{
private readonly object _lockObj = new object();
private readonly IJsonSerializer _jsonSerializer;
private readonly ILogger _logger;
private readonly ConcurrentDictionary<string, ProcessingCommandMailbox> _mailboxDict;
private readonly IProcessingCommandHandler _handler;
private readonly IScheduleService _scheduleService;
private readonly int _timeoutSeconds;
private readonly string _taskName;

public DefaultCommandProcessor(IScheduleService scheduleService, IProcessingCommandHandler handler, ILoggerFactory loggerFactory)
public DefaultCommandProcessor(IScheduleService scheduleService, IProcessingCommandHandler handler, IJsonSerializer jsonSerializer, ILoggerFactory loggerFactory)
{
_scheduleService = scheduleService;
_mailboxDict = new ConcurrentDictionary<string, ProcessingCommandMailbox>();
_handler = handler;
_jsonSerializer = jsonSerializer;
_logger = loggerFactory.Create(GetType().FullName);
_timeoutSeconds = ENodeConfiguration.Instance.Setting.AggregateRootMaxInactiveSeconds;
_taskName = "CleanInactiveProcessingCommandMailBoxes_" + DateTime.Now.Ticks + new Random().Next(10000);
Expand All @@ -38,7 +41,7 @@ public void Process(ProcessingCommand processingCommand)

var mailbox = _mailboxDict.GetOrAdd(aggregateRootId, x =>
{
return new ProcessingCommandMailbox(x, _handler, _logger);
return new ProcessingCommandMailbox(x, _handler, _jsonSerializer, _logger);
});

var mailboxTryUsingCount = 0L;
Expand All @@ -53,7 +56,7 @@ public void Process(ProcessingCommand processingCommand)
}
if (mailbox.IsRemoved)
{
mailbox = new ProcessingCommandMailbox(aggregateRootId, _handler, _logger);
mailbox = new ProcessingCommandMailbox(aggregateRootId, _handler, _jsonSerializer, _logger);
_mailboxDict.TryAdd(aggregateRootId, mailbox);
}
mailbox.EnqueueMessage(processingCommand);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private Task HandleCommandInternal(ProcessingCommand processingCommand, ICommand
}
catch (Exception ex)
{
_logger.Error(string.Format("Commit aggregate changes has unknown exception, handlerType:{0}, commandType:{1}, commandId:{2}, aggregateRootId:{3}",
_logger.Error(string.Format("Commit aggregate changes has unknown exception, this should not be happen, and we just complete the command, handlerType:{0}, commandType:{1}, commandId:{2}, aggregateRootId:{3}",
commandHandler.GetInnerObject().GetType().Name,
command.GetType().Name,
command.Id,
Expand Down
Loading

0 comments on commit a1d3efb

Please sign in to comment.