-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Connect: Fix a bug in streams closing while read or write metadata files #11609
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -122,15 +122,25 @@ public static void write(TableMetadata metadata, OutputFile outputFile) { | |
public static void internalWrite( | ||
TableMetadata metadata, OutputFile outputFile, boolean overwrite) { | ||
boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP; | ||
try (OutputStream os = overwrite ? outputFile.createOrOverwrite() : outputFile.create(); | ||
OutputStream gos = isGzip ? new GZIPOutputStream(os) : os; | ||
OutputStream os = overwrite ? outputFile.createOrOverwrite() : outputFile.create(); | ||
// if isGzip is true, os will be closed by GZIPOutputStream, | ||
// otherwise, os will be closed by try-with-resources | ||
try (OutputStream gos = isGzip ? new GZIPOutputStream(os) : os; | ||
Comment on lines
+125
to
+128
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may want to have some sort of internal output stream implementation which delegates to the gzip or underlying oputput stream, and handles the complexity of closing in case of failures in GzipOutputStream constructor, and propogating that exception. Then the code below really just would have the delegate in the try with closeable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to that idea. We should also use this for |
||
OutputStreamWriter writer = new OutputStreamWriter(gos, StandardCharsets.UTF_8)) { | ||
JsonGenerator generator = JsonUtil.factory().createGenerator(writer); | ||
generator.useDefaultPrettyPrinter(); | ||
toJson(metadata, generator); | ||
generator.flush(); | ||
} catch (IOException e) { | ||
throw new RuntimeIOException(e, "Failed to write json to file: %s", outputFile); | ||
} finally { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would be good to reproduce the root cause of this in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. additionally, whatever fix we do, we should also apply to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will try to reproduce this in a unit test |
||
try { | ||
// in case of an exception in GZIPOutputStream constructor, | ||
// the stream is not closed and needs to be closed here | ||
Comment on lines
+138
to
+139
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Explicitly close the stream in case of exceptions thrown by GZIPOutputStream constructor. See #11220 for more details" |
||
os.close(); | ||
} catch (IOException ignored) { | ||
// ignore | ||
} | ||
} | ||
} | ||
|
||
|
@@ -277,11 +287,21 @@ public static TableMetadata read(FileIO io, String path) { | |
|
||
public static TableMetadata read(FileIO io, InputFile file) { | ||
Codec codec = Codec.fromFileName(file.location()); | ||
try (InputStream is = file.newStream(); | ||
InputStream gis = codec == Codec.GZIP ? new GZIPInputStream(is) : is) { | ||
InputStream is = file.newStream(); | ||
// if codec is GZIP, is will be closed by GZIPInputStream | ||
// otherwise, os will be closed by try-with-resources | ||
try (InputStream gis = codec == Codec.GZIP ? new GZIPInputStream(is) : is) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not gathering both statements in one try close ? It would be easier. Something like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was the original code, but there is an edge case where the |
||
return fromJson(file, JsonUtil.mapper().readValue(gis, JsonNode.class)); | ||
} catch (IOException e) { | ||
throw new RuntimeIOException(e, "Failed to read file: %s", file); | ||
} finally { | ||
try { | ||
// in case of an exception in GZIPInputStream constructor, | ||
// the stream is not closed and needs to be closed here | ||
is.close(); | ||
} catch (IOException ignored) { | ||
// ignore | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not gathering all in the try-on-resource ? Something like: