Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Batch email #68

Merged
merged 7 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private void processProvenanceEvents(ReportingContext context) {
final List<String> detailsAsError =
Arrays.asList(context.getProperty(DETAILS_AS_ERROR).getValue().toLowerCase().split(","));
final String nifiUrl = context.getProperty(NIFI_URL).getValue();
final List<Map<String, Object>> allSources = new ArrayList<>();

consumer.consumeEvents(context, ((componentMapHolder, provenanceEventRecords) -> {
getLogger().debug("Starting to consume events");
Expand Down Expand Up @@ -229,16 +230,18 @@ private void processProvenanceEvents(ReportingContext context) {
source.put("view_input_content_uri", viewContentUri + "/input");
source.put("view_output_content_uri", viewContentUri + "/output");

try {
indexEvent(source, context);
} catch (IOException ex) {
getLogger().error("Failed to publish provenance event", e);
}
allSources.add(source);
}

try {
indexEvents(allSources, context);
} catch (IOException ex) {
getLogger().error("Failed to publish provenance event", ex);
}
}));
}

public abstract void indexEvent(final Map<String, Object> event, final ReportingContext context) throws IOException;
public abstract void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) throws IOException;

@Override
public void onTrigger(final ReportingContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpHost;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
Expand Down Expand Up @@ -87,46 +88,52 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}

public void indexEvent(final Map<String, Object> event, final ReportingContext context) throws IOException {
public void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) throws IOException {
final String elasticsearchUrl = context.getProperty(ELASTICSEARCH_URL).getValue();
final String elasticsearchIndex = context.getProperty(ELASTICSEARCH_INDEX).evaluateAttributeExpressions().getValue();
final ElasticsearchClient client = getElasticsearchClient(elasticsearchUrl);
final String id = Long.toString((Long) event.get("event_id"));
events.forEach(event -> {
final String id = Long.toString((Long) event.get("event_id"));

if (!event.containsKey("process_group_name") || !event.containsKey("component_name")) {
getLogger().warn("Provenance event has no process group or processor, ignoring");
return;
}
if (!event.containsKey("process_group_name") || !event.containsKey("component_name")) {
getLogger().warn("Provenance event has no process group or processor, ignoring");
return;
}

Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));
Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
try {
bobeal marked this conversation as resolved.
Show resolved Hide resolved
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));
} catch (JsonProcessingException e) {
getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e);
}
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));

final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException ex) {
getLogger().error("Error while indexing event {}", id, ex);
}
final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException | IOException ex) {
getLogger().error("Error while indexing event {}", id, ex);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Tags({"email", "provenance", "smtp"})
Expand Down Expand Up @@ -148,6 +149,17 @@ public class EmailProvenanceReporter extends AbstractProvenanceReporter {
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor GROUP_SIMILAR_ERRORS = new PropertyDescriptor.Builder()
.name("Group Similar Errors")
.displayName("Group Similar Errors")
.description("Specifies whether to group similar error events into a single email or not. " +
"Set to true to receive a single email with grouped errors. " +
"Set to false to receive an email for each error. " +
"The grouping is done by processor id and error information (event type and details)")
.required(false)
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
Expand All @@ -168,6 +180,7 @@ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
descriptors.add(SPECIFIC_RECIPIENT_ATTRIBUTE_NAME);
descriptors.add(INPUT_CHARACTER_SET);
descriptors.add(EMAIL_SUBJECT_PREFIX);
descriptors.add(GROUP_SIMILAR_ERRORS);

return descriptors;
}
Expand Down Expand Up @@ -307,14 +320,19 @@ private String getSpecificRecipientValue(final ReportingContext context, final M
return eventPreviousAttributes.get(specificRecipientAttributeName);
}

private String composeMessageContent(final Map<String, Object> event) {
private String composeMessageContent(final Map<String, Object> event, Boolean groupSimilarErrors, int groupedEventsSize) {
final StringBuilder message = new StringBuilder();

message.append("Affected processor:\n")
.append("\tProcessor name: ").append(event.get("component_name")).append("\n")
.append("\tProcessor type: ").append(event.get("component_type")).append("\n")
.append("\tProcess group: ").append(event.get("process_group_name")).append("\n")
.append("\tURL: ").append(event.get("component_url")).append("\n");
.append("\tProcess group: ").append(event.get("process_group_name")).append("\n");

if (groupSimilarErrors) {
message.append("\tTotal similar errors : ").append(groupedEventsSize).append("\n");
}

message.append("\tURL: ").append(event.get("component_url")).append("\n");

message.append("\n");
message.append("Error information:\n")
Expand All @@ -333,7 +351,7 @@ private String composeMessageContent(final Map<String, Object> event) {
Map<String, String> previousAttributes = (Map<String, String>) event.get("previous_attributes");
message.append("\nFlow file - Previous attributes:\n");
previousAttributes.keySet().stream().sorted().forEach(attributeName ->
message.append(String.format("\t%1$s: %2$s\n", attributeName, previousAttributes.get(attributeName)))
message.append(String.format("\t%1$s: %2$s\n", attributeName, previousAttributes.get(attributeName)))
);
}

Expand All @@ -348,28 +366,68 @@ private String composeMessageContent(final Map<String, Object> event) {
}

@Override
public void indexEvent(final Map<String, Object> event, final ReportingContext context) {
try {
// Send the email message only if it is an error event
if (event.containsKey("status") && event.get("status").equals("Error")) {
sendErrorEmail(event, context);
public void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) {
List<Map<String, Object>> errorEvents = filterErrorEvents(events);

if (context.getProperty(GROUP_SIMILAR_ERRORS).asBoolean()) {
// Group all error events to send in a single batch email
events.stream()
.collect(Collectors.groupingBy(this::groupingKeys))
.forEach((groupingKeys, groupedEvents) -> {
try {
sendErrorEmail(groupedEvents.get(0), context, groupedEvents.size());
} catch (MessagingException e) {
getLogger().error("Error sending error email: " + e.getMessage(), e);
}
});
} else {
// Send individual emails for each error event
for (Map<String, Object> event : errorEvents) {
try {
sendErrorEmail(event, context, 0);
} catch (MessagingException e) {
getLogger().error("Error sending error email: " + e.getMessage(), e);
}
}
} catch (MessagingException e) {
getLogger().error("Error sending error email: " + e.getMessage(), e);
}
}

public void sendErrorEmail(Map<String, Object> event, ReportingContext context) throws MessagingException {
String emailSubject;
if (context.getProperty(EMAIL_SUBJECT_PREFIX).getValue() != null) {
emailSubject = "[" + context.getProperty(EMAIL_SUBJECT_PREFIX).getValue() + "] "
+ "Error occurred in processor " + event.get("component_name") + " "
+ "in process group " + event.get("process_group_name");
} else {
emailSubject = "Error occurred in processor " + event.get("component_name") + " "
+ "in process group " + event.get("process_group_name");
private List<Map<String, Object>> filterErrorEvents(final List<Map<String, Object>> events) {
return events.stream()
.filter(event -> "Error".equals(event.get("status")))
.collect(Collectors.toList());
}


private Map<String, String> groupingKeys(Map<String, Object> event) {
return Map.of(
"component_id", event.get("component_id").toString(),
"details", event.get("details").toString(),
"event_type", event.get("event_type").toString()
);
}

public void sendErrorEmail(Map<String, Object> event, ReportingContext context, int groupedEventsSize) throws MessagingException {

String subjectPrefix = context.getProperty(EMAIL_SUBJECT_PREFIX).getValue();
Boolean groupSimilarErrors = context.getProperty(GROUP_SIMILAR_ERRORS).asBoolean();
StringBuilder emailSubjectBuilder = new StringBuilder();

if (subjectPrefix != null) {
emailSubjectBuilder.append("[").append(subjectPrefix).append("] ");
}

if (groupSimilarErrors) {
emailSubjectBuilder.append(groupedEventsSize).append(" errors occurred in processor ")
.append(event.get("component_name")).append(" in process group ")
.append(event.get("process_group_name"));

} else {
emailSubjectBuilder.append("Error occurred in processor ")
.append(event.get("component_name")).append(" in process group ")
.append(event.get("process_group_name"));
}
String emailSubject = emailSubjectBuilder.toString();

final Properties properties = this.getEmailProperties(context);
final Session mailSession = this.createMailSession(properties, context);
Expand Down Expand Up @@ -397,7 +455,7 @@ public void sendErrorEmail(Map<String, Object> event, ReportingContext context)
this.setMessageHeader("X-Mailer", context.getProperty(HEADER_XMAILER).getValue(), message);
message.setSubject(emailSubject);

final String messageText = composeMessageContent(event);
final String messageText = composeMessageContent(event, groupSimilarErrors, groupedEventsSize);

final String contentType = context.getProperty(CONTENT_TYPE).getValue();
final Charset charset = getCharset(context);
Expand Down
Loading