From 64f9b5dd55732004f0a0b51aa0d87a9a8d60dee0 Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Thu, 21 Nov 2024 01:06:47 +0100 Subject: [PATCH] Kafka Connect: Fix a bug in streams closing while read or write metadata files --- .../apache/iceberg/TableMetadataParser.java | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index c244b3996c9e..bf0ef536a9d1 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -122,8 +122,10 @@ public static void write(TableMetadata metadata, OutputFile outputFile) { public static void internalWrite( TableMetadata metadata, OutputFile outputFile, boolean overwrite) { boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP; - try (OutputStream os = overwrite ? outputFile.createOrOverwrite() : outputFile.create(); - OutputStream gos = isGzip ? new GZIPOutputStream(os) : os; + OutputStream os = overwrite ? outputFile.createOrOverwrite() : outputFile.create(); + // if isGzip is true, os will be closed by GZIPOutputStream, + // otherwise, os will be closed by try-with-resources + try (OutputStream gos = isGzip ? new GZIPOutputStream(os) : os; OutputStreamWriter writer = new OutputStreamWriter(gos, StandardCharsets.UTF_8)) { JsonGenerator generator = JsonUtil.factory().createGenerator(writer); generator.useDefaultPrettyPrinter(); @@ -131,6 +133,14 @@ public static void internalWrite( generator.flush(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to write json to file: %s", outputFile); + } finally { + try { + // in case of an exception in GZIPOutputStream constructor, + // the stream is not closed and needs to be closed here + os.close(); + } catch (IOException ignored) { + // ignore + } } } @@ -277,11 +287,21 @@ public static TableMetadata read(FileIO io, String path) { public static TableMetadata read(FileIO io, InputFile file) { Codec codec = Codec.fromFileName(file.location()); - try (InputStream is = file.newStream(); - InputStream gis = codec == Codec.GZIP ? new GZIPInputStream(is) : is) { + InputStream is = file.newStream(); + // if codec is GZIP, is will be closed by GZIPInputStream + // otherwise, os will be closed by try-with-resources + try (InputStream gis = codec == Codec.GZIP ? new GZIPInputStream(is) : is) { return fromJson(file, JsonUtil.mapper().readValue(gis, JsonNode.class)); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to read file: %s", file); + } finally { + try { + // in case of an exception in GZIPInputStream constructor, + // the stream is not closed and needs to be closed here + is.close(); + } catch (IOException ignored) { + // ignore + } } }