diff --git a/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java b/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java index 9b13528..8195f37 100644 --- a/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java +++ b/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -43,6 +44,13 @@ @Tags({"elasticsearch", "provenance"}) @CapabilityDescription("A provenance reporting task that writes to Elasticsearch") public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter { + + static final List DEFAULT_PROCESSORS_TYPES_ALLOWLIST = Arrays.asList( + "DeleteSFTP", "ExecuteSQLRecord", "ExtendedValidateCsv", "FetchFTP", + "FetchSFTP", "FetchSmb", "GenerateFlowFile", "GetFTP", "GetSFTP", "GetSmbFile", "InvokeHTTP", "ListenFTP", + "ListFTP", "ListSFTP", "ListSmb", "PutFTP", "PutSFTP", "PutSmbFile" + ); + public static final PropertyDescriptor ELASTICSEARCH_URL = new PropertyDescriptor .Builder().name("Elasticsearch URL") .displayName("Elasticsearch URL") @@ -62,6 +70,15 @@ public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor PROCESSORS_TYPES_ALLOWLIST = new PropertyDescriptor + .Builder().name("Processors Types Allowlist") + .displayName("Processors Types Allowlist") + .description("Specifies a comma-separated list of processors types for which all provenance events " + + "will be sent. If the processor type is not in the list, only error events will be sent.") + .defaultValue(String.join(",", DEFAULT_PROCESSORS_TYPES_ALLOWLIST)) + .addValidator(StandardValidators.createListValidator(true, true, StandardValidators.NON_BLANK_VALIDATOR)) + .build(); + private final Map esClients = new HashMap<>(); private final ObjectMapper objectMapper = new ObjectMapper(); @@ -85,6 +102,7 @@ public final List getSupportedPropertyDescriptors() { descriptors = super.getSupportedPropertyDescriptors(); descriptors.add(ELASTICSEARCH_URL); descriptors.add(ELASTICSEARCH_INDEX); + descriptors.add(PROCESSORS_TYPES_ALLOWLIST); return descriptors; } @@ -92,6 +110,9 @@ public void indexEvents(final List> events, final ReportingC final String elasticsearchUrl = context.getProperty(ELASTICSEARCH_URL).getValue(); final String elasticsearchIndex = context.getProperty(ELASTICSEARCH_INDEX).evaluateAttributeExpressions().getValue(); final ElasticsearchClient client = getElasticsearchClient(elasticsearchUrl); + final List processorTypesAllowlist = + Arrays.asList(context.getProperty(PROCESSORS_TYPES_ALLOWLIST).getValue().split(",")); + events.forEach(event -> { final String id = Long.toString((Long) event.get("event_id")); @@ -99,42 +120,55 @@ public void indexEvents(final List> events, final ReportingC getLogger().warn("Provenance event has no process group or processor, ignoring"); return; } - - Map preparedEvent = new HashMap<>(); - preparedEvent.put("event_id", event.get("event_id")); - preparedEvent.put("event_time_millis", event.get("event_time")); - preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc")); - preparedEvent.put("event_type", event.get("event_type")); - preparedEvent.put("component_type", event.get("component_type")); - preparedEvent.put("component_url", event.get("component_url")); - preparedEvent.put("component_name", event.get("component_name")); - preparedEvent.put("process_group_name", event.get("process_group_name")); - preparedEvent.put("process_group_id", event.get("process_group_id")); - preparedEvent.put("status", event.get("status")); - preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri")); - preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri")); - preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri")); - preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri")); - try { - preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes"))); - preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes"))); - } catch (JsonProcessingException e) { - getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e); + if(!event.containsKey("component_type")) { + getLogger().warn("Provenance event has no component type, ignoring"); + return; } - if (event.containsKey("details")) - preparedEvent.put("details", event.get("details")); - - final IndexRequest> indexRequest = new - IndexRequest.Builder>() - .index(elasticsearchIndex) - .id(id) - .document(preparedEvent) - .build(); - try { - client.index(indexRequest); - } catch (ElasticsearchException | IOException ex) { - getLogger().error("Error while indexing event {}", id, ex); + + final String componentType = event.get("component_type").toString(); + final String status = event.get("status").toString(); + + if(processorTypesAllowlist.contains(componentType)|| status.equals("Error")) { + + Map preparedEvent = new HashMap<>(); + preparedEvent.put("event_id", event.get("event_id")); + preparedEvent.put("event_time_millis", event.get("event_time")); + preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc")); + preparedEvent.put("event_type", event.get("event_type")); + preparedEvent.put("component_type", event.get("component_type")); + preparedEvent.put("component_url", event.get("component_url")); + preparedEvent.put("component_name", event.get("component_name")); + preparedEvent.put("process_group_name", event.get("process_group_name")); + preparedEvent.put("process_group_id", event.get("process_group_id")); + preparedEvent.put("status", event.get("status")); + preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri")); + preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri")); + preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri")); + preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri")); + try { + preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes"))); + preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes"))); + } catch (JsonProcessingException e) { + getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e); + } + if (event.containsKey("details")) + preparedEvent.put("details", event.get("details")); + + final IndexRequest> indexRequest = new + IndexRequest.Builder>() + .index(elasticsearchIndex) + .id(id) + .document(preparedEvent) + .build(); + try { + client.index(indexRequest); + } catch (ElasticsearchException | IOException ex) { + getLogger().error("Error while indexing event {}", id, ex); + } + } + + }); } }