Skip to content

Commit

Permalink
Fix for [BUG] Data Prepper is losing connections from S3 pool
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Dec 8, 2023
1 parent 668066f commit 95ce39b
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.function.Consumer;
import java.util.Objects;

public interface InputCodec {
/**
Expand All @@ -31,8 +32,19 @@ public interface InputCodec {
* @param eventConsumer The consumer which handles each event from the stream
* @throws IOException throws IOException when invalid input is received or incorrect codec name is provided
*/
void parse(
default void parse(
InputFile inputFile,
DecompressionEngine decompressionEngine,
Consumer<Record<Event>> eventConsumer) throws IOException;
Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputFile);
Objects.requireNonNull(eventConsumer);
InputStream inputStream = inputFile.newStream();
try {
parse(decompressionEngine.createInputStream(inputStream), eventConsumer);
inputStream.close();
} catch (Exception e) {
inputStream.close();
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@ public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer
parseAvroStream(inputStream, eventConsumer);
}

@Override
public void parse(final InputFile inputFile, final DecompressionEngine decompressionEngine, final Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputFile);
Objects.requireNonNull(eventConsumer);

parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer);
}

private void parseAvroStream(final InputStream inputStream, final Consumer<Record<Event>> eventConsumer) {

try {
Expand Down Expand Up @@ -107,4 +99,4 @@ else if(value instanceof Utf8){
return eventData;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ public void parse(final InputStream inputStream, final Consumer<Record<Event>> e
}
}

@Override
public void parse(final InputFile inputFile, DecompressionEngine decompressionEngine, Consumer<Record<Event>> eventConsumer) throws IOException {
parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer);
}

private void parseBufferedReader(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
final CsvMapper mapper = createCsvMapper();
final CsvSchema schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ public void parse(final InputStream inputStream, final Consumer<Record<Event>> e
}
}

@Override
public void parse(final InputFile inputFile, final DecompressionEngine decompressionEngine, final Consumer<Record<Event>> eventConsumer) throws IOException {
parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer);
}

private void parseBufferedReader(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
final boolean doAddHeaderToOutgoingEvents = Objects.nonNull(headerDestination);
boolean hasReadHeader = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.model.codec.JsonDecoder;

import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.function.Consumer;

Expand All @@ -23,15 +24,4 @@
@DataPrepperPlugin(name = "json", pluginType = InputCodec.class)
public class JsonInputCodec extends JsonDecoder implements InputCodec {

@Override
public void parse(
final InputFile inputFile,
final DecompressionEngine decompressionEngine,
final Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputFile);
Objects.requireNonNull(eventConsumer);

parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer);
}

}

0 comments on commit 95ce39b

Please sign in to comment.