Skip to content

Commit

Permalink
feat: add property to filter provenance events by processors types (#78)
Browse files Browse the repository at this point in the history
* feat: add property to filter provenance events by processors types

* add default values + some minor fixes

* minor fix
  • Loading branch information
ranim-n authored Nov 5, 2024
1 parent 0569290 commit 6431b64
Showing 1 changed file with 68 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Tags({"elasticsearch", "provenance"})
@CapabilityDescription("A provenance reporting task that writes to Elasticsearch")
public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter {

static final List<String> DEFAULT_PROCESSORS_TYPES_ALLOWLIST = Arrays.asList(
"DeleteSFTP", "ExecuteSQLRecord", "ExtendedValidateCsv", "FetchFTP",
"FetchSFTP", "FetchSmb", "GenerateFlowFile", "GetFTP", "GetSFTP", "GetSmbFile", "InvokeHTTP", "ListenFTP",
"ListFTP", "ListSFTP", "ListSmb", "PutFTP", "PutSFTP", "PutSmbFile"
);

public static final PropertyDescriptor ELASTICSEARCH_URL = new PropertyDescriptor
.Builder().name("Elasticsearch URL")
.displayName("Elasticsearch URL")
Expand All @@ -62,6 +70,15 @@ public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final PropertyDescriptor PROCESSORS_TYPES_ALLOWLIST = new PropertyDescriptor
.Builder().name("Processors Types Allowlist")
.displayName("Processors Types Allowlist")
.description("Specifies a comma-separated list of processors types for which all provenance events "
+ "will be sent. If the processor type is not in the list, only error events will be sent.")
.defaultValue(String.join(",", DEFAULT_PROCESSORS_TYPES_ALLOWLIST))
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.NON_BLANK_VALIDATOR))
.build();

private final Map<String, ElasticsearchClient> esClients = new HashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();

Expand All @@ -85,56 +102,73 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
descriptors = super.getSupportedPropertyDescriptors();
descriptors.add(ELASTICSEARCH_URL);
descriptors.add(ELASTICSEARCH_INDEX);
descriptors.add(PROCESSORS_TYPES_ALLOWLIST);
return descriptors;
}

public void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) throws IOException {
final String elasticsearchUrl = context.getProperty(ELASTICSEARCH_URL).getValue();
final String elasticsearchIndex = context.getProperty(ELASTICSEARCH_INDEX).evaluateAttributeExpressions().getValue();
final ElasticsearchClient client = getElasticsearchClient(elasticsearchUrl);
final List<String> processorTypesAllowlist =
Arrays.asList(context.getProperty(PROCESSORS_TYPES_ALLOWLIST).getValue().split(","));

events.forEach(event -> {
final String id = Long.toString((Long) event.get("event_id"));

if (!event.containsKey("process_group_name") || !event.containsKey("component_name")) {
getLogger().warn("Provenance event has no process group or processor, ignoring");
return;
}

Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_id", event.get("event_id"));
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
try {
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));
} catch (JsonProcessingException e) {
getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e);
if(!event.containsKey("component_type")) {
getLogger().warn("Provenance event has no component type, ignoring");
return;
}
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));

final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException | IOException ex) {
getLogger().error("Error while indexing event {}", id, ex);

final String componentType = event.get("component_type").toString();
final String status = event.get("status").toString();

if(processorTypesAllowlist.contains(componentType)|| status.equals("Error")) {

Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_id", event.get("event_id"));
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
try {
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));
} catch (JsonProcessingException e) {
getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e);
}
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));

final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException | IOException ex) {
getLogger().error("Error while indexing event {}", id, ex);
}

}


});
}
}

0 comments on commit 6431b64

Please sign in to comment.