Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add property to filter provenance events by processors types #78

Merged
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Tags({"elasticsearch", "provenance"})
@CapabilityDescription("A provenance reporting task that writes to Elasticsearch")
public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter {

static final List<String> DEFAULT_PROCESSORS_TYPES_ALLOWLIST = Arrays.asList(
"DeleteSFTP", "ExecuteSQLRecord", "ExtendedValidateCsv", "FetchFTP",
"FetchSFTP", "FetchSmb", "GenerateFlowFile", "GetFTP", "GetSFTP", "GetSmbFile", "InvokeHTTP", "ListenFTP",
"ListFTP", "ListSFTP", "ListSmb", "PutFTP", "PutSFTP", "PutSmbFile"
);

public static final PropertyDescriptor ELASTICSEARCH_URL = new PropertyDescriptor
.Builder().name("Elasticsearch URL")
.displayName("Elasticsearch URL")
Expand All @@ -62,6 +70,14 @@ public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final PropertyDescriptor PROCESSORS_TYPES_ALLOWLIST = new PropertyDescriptor.Builder().name("details-as-error")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"details-as-errror"?

.displayName("Processors Types Allowlist")
.description("Specifies a comma-separated list of processors types for which all provenance events "
+ "will be sent. If the processor type is not in the list, only error events will be sent.")
.defaultValue(String.join(",", DEFAULT_PROCESSORS_TYPES_ALLOWLIST))
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.NON_BLANK_VALIDATOR))
.build();

private final Map<String, ElasticsearchClient> esClients = new HashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();

Expand All @@ -85,56 +101,73 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
descriptors = super.getSupportedPropertyDescriptors();
descriptors.add(ELASTICSEARCH_URL);
descriptors.add(ELASTICSEARCH_INDEX);
descriptors.add(PROCESSORS_TYPES_ALLOWLIST);
return descriptors;
}

public void indexEvents(final List<Map<String, Object>> events, final ReportingContext context) throws IOException {
final String elasticsearchUrl = context.getProperty(ELASTICSEARCH_URL).getValue();
final String elasticsearchIndex = context.getProperty(ELASTICSEARCH_INDEX).evaluateAttributeExpressions().getValue();
final ElasticsearchClient client = getElasticsearchClient(elasticsearchUrl);
final List<String> processorTypesAllowlist =
Arrays.asList(context.getProperty(PROCESSORS_TYPES_ALLOWLIST).getValue().split(","));

events.forEach(event -> {
final String id = Long.toString((Long) event.get("event_id"));

if (!event.containsKey("process_group_name") || !event.containsKey("component_name")) {
getLogger().warn("Provenance event has no process group or processor, ignoring");
return;
}

Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_id", event.get("event_id"));
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
try {
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));
} catch (JsonProcessingException e) {
getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e);
if(!event.containsKey("component_type")) {
bobeal marked this conversation as resolved.
Show resolved Hide resolved
getLogger().warn("Provenance event has no component type, ignoring");
return;
}
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));

final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException | IOException ex) {
getLogger().error("Error while indexing event {}", id, ex);

final String componentType = event.get("component_type").toString();
final String status = event.get("status").toString();

if(processorTypesAllowlist.contains(componentType)|| status.equals("Error")) {

Map<String, Object> preparedEvent = new HashMap<>();
preparedEvent.put("event_id", event.get("event_id"));
preparedEvent.put("event_time_millis", event.get("event_time"));
preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc"));
preparedEvent.put("event_type", event.get("event_type"));
preparedEvent.put("component_type", event.get("component_type"));
preparedEvent.put("component_url", event.get("component_url"));
preparedEvent.put("component_name", event.get("component_name"));
preparedEvent.put("process_group_name", event.get("process_group_name"));
preparedEvent.put("process_group_id", event.get("process_group_id"));
preparedEvent.put("status", event.get("status"));
preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri"));
preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri"));
preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri"));
preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri"));
try {
preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes")));
preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes")));
} catch (JsonProcessingException e) {
getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e);
}
if (event.containsKey("details"))
preparedEvent.put("details", event.get("details"));

final IndexRequest<Map<String, Object>> indexRequest = new
IndexRequest.Builder<Map<String, Object>>()
.index(elasticsearchIndex)
.id(id)
.document(preparedEvent)
.build();
try {
client.index(indexRequest);
} catch (ElasticsearchException | IOException ex) {
getLogger().error("Error while indexing event {}", id, ex);
}

}


});
}
}
Loading