Skip to content

Commit

Permalink
Push local changes
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed May 15, 2024
1 parent f676e10 commit 3c71720
Show file tree
Hide file tree
Showing 31 changed files with 2,033 additions and 18 deletions.
58 changes: 57 additions & 1 deletion tif/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import org.opensearch.gradle.test.RestIntegTestTask

/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
Expand All @@ -21,6 +23,10 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.0'
implementation 'com.fasterxml.jackson.module:jackson-module-afterburner:2.17.0'
implementation 'com.google.guava:guava:33.1.0-jre'
implementation 'org.opensearch.client:opensearch-rest-high-level-client:2.13.0'
implementation platform('org.apache.logging.log4j:log4j-bom:2.22.1')
implementation 'org.apache.logging.log4j:log4j-core'
implementation 'org.apache.logging.log4j:log4j-slf4j2-impl'

testImplementation "org.mockito:mockito-inline:5.2.0"
testImplementation "org.mockito:mockito-core:5.11.0"
Expand All @@ -31,4 +37,54 @@ dependencies {

test {
useJUnitPlatform()
}
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/test/java')
}
resources.srcDir file('src/test/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task s3ConnectorIT(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.s3connector.bucket', System.getProperty('tests.s3connector.bucket')
systemProperty 'tests.s3connector.region', System.getProperty('tests.s3connector.region')
systemProperty 'tests.s3connector.roleArn', System.getProperty('tests.s3connector.roleArn')

filter {
includeTestsMatching 'S3ConnectorIT'
}
}

task systemIndexFeedStoreIT(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.opensearch.host', System.getProperty('tests.opensearch.host')
systemProperty 'tests.opensearch.user', System.getProperty('tests.opensearch.user')
systemProperty 'tests.opensearch.password', System.getProperty('tests.opensearch.password')

filter {
includeTestsMatching 'SystemIndexFeedStoreIT'
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ public List<IOC> loadIOCs() {
final GetObjectRequest getObjectRequest = getObjectRequest();
final ResponseInputStream<GetObjectResponse> response = s3Client.getObject(getObjectRequest);

return inputCodec.parse(response);
final List<IOC> iocs = inputCodec.parse(response);
setFeedId(iocs);

return iocs;
}

private GetObjectRequest getObjectRequest() {
Expand All @@ -41,4 +44,8 @@ private GetObjectRequest getObjectRequest() {
.key(s3ConnectorConfig.getObjectKey())
.build();
}

private void setFeedId(final List<IOC> iocs) {
iocs.forEach(ioc -> ioc.setFeedId(s3ConnectorConfig.getFeedId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@ public class S3ConnectorConfig {
private final String roleArn;
private final IOCSchema iocSchema;
private final InputCodecSchema inputCodecSchema;
private final String feedId;

public S3ConnectorConfig(final String bucketName, final String objectKey, final String region,
final String roleArn, final IOCSchema iocSchema, final InputCodecSchema inputCodecSchema) {
final String roleArn, final IOCSchema iocSchema, final InputCodecSchema inputCodecSchema,
final String feedId) {
this.bucketName = bucketName;
this.objectKey = objectKey;
this.region = region;
this.roleArn = roleArn;
this.iocSchema = iocSchema;
this.inputCodecSchema = inputCodecSchema;
this.feedId = feedId;
}

public String getBucketName() {
Expand All @@ -47,4 +50,8 @@ public IOCSchema getIocSchema() {
public InputCodecSchema getInputCodecSchema() {
return inputCodecSchema;
}

public String getFeedId() {
return feedId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.opensearch.securityanalytics.exceptions;

public class FeedStoreException extends RuntimeException {
public FeedStoreException(final String message) {
super(message);
}

public FeedStoreException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.opensearch.securityanalytics.exceptions;

public class IndexAccessorException extends RuntimeException {
public IndexAccessorException(final String message) {
super(message);
}

public IndexAccessorException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.securityanalytics.exceptions;

public class ResourceReaderException extends RuntimeException {
public ResourceReaderException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.feed;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class FeedManager {
private static final Logger log = LoggerFactory.getLogger(FeedManager.class);

private final ScheduledExecutorService executorService;
private final Map<String, ScheduledFuture<?>> registeredTasks;

public FeedManager() {
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);

executorService = Executors.unconfigurableScheduledExecutorService(scheduledThreadPoolExecutor);
registeredTasks = new HashMap<>();
}

@VisibleForTesting
FeedManager(final ScheduledExecutorService scheduledExecutorService, final Map<String, ScheduledFuture<?>> registeredTasks) {
this.executorService = scheduledExecutorService;
this.registeredTasks = registeredTasks;
}

public void registerFeedRetriever(final String feedId, final Runnable feedRetriever, final Duration refreshInterval) {
if (registeredTasks.containsKey(feedId)) {
log.warn("Field with ID {} already has a retriever registered. Will replace existing feed retriever with new definition.", feedId);
deregisterFeedRetriever(feedId);
}

final ScheduledFuture<?> retrieverFuture = executorService.scheduleAtFixedRate(feedRetriever, 0, refreshInterval.toMillis(), TimeUnit.MILLISECONDS);
registeredTasks.put(feedId, retrieverFuture);
}

public void deregisterFeedRetriever(final String feedId) {
if (registeredTasks.containsKey(feedId)) {
final ScheduledFuture<?> retrieverFuture = registeredTasks.remove(feedId);
retrieverFuture.cancel(true);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.feed.retriever;

import org.opensearch.securityanalytics.connector.IOCConnector;
import org.opensearch.securityanalytics.feed.store.FeedStore;
import org.opensearch.securityanalytics.feed.store.model.UpdateType;
import org.opensearch.securityanalytics.model.IOC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class FeedRetriever implements Runnable {
private static final Logger log = LoggerFactory.getLogger(FeedRetriever.class);

private final IOCConnector iocConnector;
private final FeedStore feedStore;
private final UpdateType updateType;
private final String feedId;

public FeedRetriever(final IOCConnector iocConnector, final FeedStore feedStore, final UpdateType updateType, final String feedId) {
this.iocConnector = iocConnector;
this.feedStore = feedStore;
this.updateType = updateType;
this.feedId = feedId;
}

@Override
public void run() {
try {
final List<IOC> iocs = iocConnector.loadIOCs();
feedStore.storeIOCs(iocs, updateType);
} catch (final Exception e) {
log.error("Unable to fetch feed with ID {}", feedId, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.feed.store;

import org.opensearch.securityanalytics.feed.store.model.UpdateType;
import org.opensearch.securityanalytics.model.IOC;

import java.util.List;

public interface FeedStore {
/**
* Accepts a list of IOCs and stores them locally for use in feed processing
*
* @param iocs - A list of the IOCs to store
* @param updateType - The type of update to make to the underlying store
*/
void storeIOCs(List<IOC> iocs, UpdateType updateType);
}
Loading

0 comments on commit 3c71720

Please sign in to comment.