Skip to content

Commit

Permalink
feat: Batch email (#68)
Browse files Browse the repository at this point in the history
* feat: Batch email

* fixes in Elastichsearch provenance reporter

* Update nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/EmailProvenanceReporter.java

Co-authored-by: Benoit Orihuela <[email protected]>

* Update nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/EmailProvenanceReporter.java

Co-authored-by: Benoit Orihuela <[email protected]>

* changed property name + removed method

* removed the runtime exception

* fix: correct indentations and fix some text and typos

---------

Co-authored-by: Benoit Orihuela <[email protected]>
  • Loading branch information
ranim-n and bobeal authored Sep 18, 2024
1 parent f8bf964 commit 67765f5
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private void processProvenanceEvents(ReportingContext context) {
final List<String> detailsAsError =
Arrays.asList(context.getProperty(DETAILS_AS_ERROR).getValue().toLowerCase().split(","));
final String nifiUrl = context.getProperty(NIFI_URL).getValue();
final List<Map<String, Object>> allSources = new ArrayList<>();

consumer.consumeEvents(context, ((componentMapHolder, provenanceEventRecords) -> {
getLogger().debug("Starting to consume events");
Expand Down Expand Up @@ -229,16 +230,18 @@ private void processProvenanceEvents(ReportingContext context) {
source.put("view_input_content_uri", viewContentUri + "/input");
source.put("view_output_content_uri", viewContentUri + "/output");

try {
indexEvent(source, context);
} catch (IOException ex) {
getLogger().error("Failed to publish provenance event", e);
}
allSources.add(source);
}

try {
indexEvents(allSources, context);
} catch (IOException ex) {
getLogger().error("Failed to publish provenance event", ex);
}
}));
}

public abstract void indexEvent(final Map<String, Object> event, final ReportingContext context) throws IOException;
public abstract void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) throws IOException;

@Override
public void onTrigger(final ReportingContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpHost;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
Expand Down Expand Up @@ -87,46 +88,52 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}

public void indexEvent(final Map<String, Object> event, final ReportingContext context) throws IOException {
public void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) throws IOException {
final String elasticsearchUrl = context.getProperty(ELASTICSEARCH_URL).getValue();
final String elasticsearchIndex = context.getProperty(ELASTICSEARCH_INDEX).evaluateAttributeExpressions().getValue();
final ElasticsearchClient client = getElasticsearchClient(elasticsearchUrl);
final String id = Long.toString((Long) event.get("event_id"));
events.forEach(event -> {
final String id = Long.toString((Long) event.get("event_id"));

if (!event.containsKey("process_group_name") || !event.containsKey("component_name")) {
getLogger().warn("Provenance event has no process group or processor, ignoring");
return;
}
if (!event.containsKey("process_group_name") || !event.containsKey("component_name")) {
getLogger().warn("Provenance event has no process group or processor, ignoring");
return;
}

Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));
Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
try {
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));
} catch (JsonProcessingException e) {
getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e);
}
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));

final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException ex) {
getLogger().error("Error while indexing event {}", id, ex);
}
final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException | IOException ex) {
getLogger().error("Error while indexing event {}", id, ex);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Tags({"email", "provenance", "smtp"})
Expand Down Expand Up @@ -148,6 +149,17 @@ public class EmailProvenanceReporter extends AbstractProvenanceReporter {
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor GROUP_SIMILAR_ERRORS = new PropertyDescriptor.Builder()
.name("Group Similar Errors")
.displayName("Group Similar Errors")
.description("Specifies whether to group similar error events into a single email or not. " +
"Set to true to receive a single email with grouped errors. " +
"Set to false to receive an email for each error. " +
"The grouping is done by processor id and error information (event type and details)")
.required(false)
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
Expand All @@ -168,6 +180,7 @@ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
descriptors.add(SPECIFIC_RECIPIENT_ATTRIBUTE_NAME);
descriptors.add(INPUT_CHARACTER_SET);
descriptors.add(EMAIL_SUBJECT_PREFIX);
descriptors.add(GROUP_SIMILAR_ERRORS);

return descriptors;
}
Expand Down Expand Up @@ -307,14 +320,19 @@ private String getSpecificRecipientValue(final ReportingContext context, final M
return eventPreviousAttributes.get(specificRecipientAttributeName);
}

private String composeMessageContent(final Map<String, Object> event) {
private String composeMessageContent(final Map<String, Object> event, Boolean groupSimilarErrors, int groupedEventsSize) {
final StringBuilder message = new StringBuilder();

message.append("Affected processor:\n")
.append("\tProcessor name: ").append(event.get("component_name")).append("\n")
.append("\tProcessor type: ").append(event.get("component_type")).append("\n")
.append("\tProcess group: ").append(event.get("process_group_name")).append("\n")
.append("\tURL: ").append(event.get("component_url")).append("\n");
.append("\tProcess group: ").append(event.get("process_group_name")).append("\n");

if (groupSimilarErrors) {
message.append("\tTotal similar errors : ").append(groupedEventsSize).append("\n");
}

message.append("\tURL: ").append(event.get("component_url")).append("\n");

message.append("\n");
message.append("Error information:\n")
Expand All @@ -333,7 +351,7 @@ private String composeMessageContent(final Map<String, Object> event) {
Map<String, String> previousAttributes = (Map<String, String>) event.get("previous_attributes");
message.append("\nFlow file - Previous attributes:\n");
previousAttributes.keySet().stream().sorted().forEach(attributeName ->
message.append(String.format("\t%1$s: %2$s\n", attributeName, previousAttributes.get(attributeName)))
message.append(String.format("\t%1$s: %2$s\n", attributeName, previousAttributes.get(attributeName)))
);
}

Expand All @@ -348,28 +366,68 @@ private String composeMessageContent(final Map<String, Object> event) {
}

@Override
public void indexEvent(final Map<String, Object> event, final ReportingContext context) {
try {
// Send the email message only if it is an error event
if (event.containsKey("status") && event.get("status").equals("Error")) {
sendErrorEmail(event, context);
public void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) {
List<Map<String, Object>> errorEvents = filterErrorEvents(events);

if (context.getProperty(GROUP_SIMILAR_ERRORS).asBoolean()) {
// Group all error events to send in a single batch email
events.stream()
.collect(Collectors.groupingBy(this::groupingKeys))
.forEach((groupingKeys, groupedEvents) -> {
try {
sendErrorEmail(groupedEvents.get(0), context, groupedEvents.size());
} catch (MessagingException e) {
getLogger().error("Error sending error email: " + e.getMessage(), e);
}
});
} else {
// Send individual emails for each error event
for (Map<String, Object> event : errorEvents) {
try {
sendErrorEmail(event, context, 0);
} catch (MessagingException e) {
getLogger().error("Error sending error email: " + e.getMessage(), e);
}
}
} catch (MessagingException e) {
getLogger().error("Error sending error email: " + e.getMessage(), e);
}
}

public void sendErrorEmail(Map<String, Object> event, ReportingContext context) throws MessagingException {
String emailSubject;
if (context.getProperty(EMAIL_SUBJECT_PREFIX).getValue() != null) {
emailSubject = "[" + context.getProperty(EMAIL_SUBJECT_PREFIX).getValue() + "] "
+ "Error occurred in processor " + event.get("component_name") + " "
+ "in process group " + event.get("process_group_name");
} else {
emailSubject = "Error occurred in processor " + event.get("component_name") + " "
+ "in process group " + event.get("process_group_name");
private List<Map<String, Object>> filterErrorEvents(final List<Map<String, Object>> events) {
return events.stream()
.filter(event -> "Error".equals(event.get("status")))
.collect(Collectors.toList());
}


private Map<String, String> groupingKeys(Map<String, Object> event) {
return Map.of(
"component_id", event.get("component_id").toString(),
"details", event.get("details").toString(),
"event_type", event.get("event_type").toString()
);
}

public void sendErrorEmail(Map<String, Object> event, ReportingContext context, int groupedEventsSize) throws MessagingException {

String subjectPrefix = context.getProperty(EMAIL_SUBJECT_PREFIX).getValue();
Boolean groupSimilarErrors = context.getProperty(GROUP_SIMILAR_ERRORS).asBoolean();
StringBuilder emailSubjectBuilder = new StringBuilder();

if (subjectPrefix != null) {
emailSubjectBuilder.append("[").append(subjectPrefix).append("] ");
}

if (groupSimilarErrors) {
emailSubjectBuilder.append(groupedEventsSize).append(" errors occurred in processor ")
.append(event.get("component_name")).append(" in process group ")
.append(event.get("process_group_name"));

} else {
emailSubjectBuilder.append("Error occurred in processor ")
.append(event.get("component_name")).append(" in process group ")
.append(event.get("process_group_name"));
}
String emailSubject = emailSubjectBuilder.toString();

final Properties properties = this.getEmailProperties(context);
final Session mailSession = this.createMailSession(properties, context);
Expand Down Expand Up @@ -397,7 +455,7 @@ public void sendErrorEmail(Map<String, Object> event, ReportingContext context)
this.setMessageHeader("X-Mailer", context.getProperty(HEADER_XMAILER).getValue(), message);
message.setSubject(emailSubject);

final String messageText = composeMessageContent(event);
final String messageText = composeMessageContent(event, groupSimilarErrors, groupedEventsSize);

final String contentType = context.getProperty(CONTENT_TYPE).getValue();
final Charset charset = getCharset(context);
Expand Down

0 comments on commit 67765f5

Please sign in to comment.