From ace65401f232f74151f37cf7e36c992630266174 Mon Sep 17 00:00:00 2001 From: OT-XY Date: Thu, 21 Sep 2023 15:14:44 +0800 Subject: [PATCH] [hotfix-#1821][hdfs]Modify orc file calculation method --- .../hdfs/sink/HdfsOrcOutputFormat.java | 2 +- .../hdfs/sink/HdfsParquetOutputFormat.java | 2 +- .../hdfs/sink/HdfsTextOutputFormat.java | 2 +- .../sink/format/BaseFileOutputFormat.java | 2 + chunjun-local-test/pom.xml | 121 +++++++++--------- 5 files changed, 69 insertions(+), 60 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java index 6adafddaad..75a8b53902 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java @@ -149,7 +149,7 @@ protected void openSource() { @Override // todo the deviation needs to be calculated accurately protected long getCurrentFileSize() { - return (long) (bytesWriteCounter.getLocalValue() * getDeviation()); + return (long) ((bytesWriteCounter.getLocalValue() - lastWriteSize) * getDeviation()); } @Override diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java index 445c69d227..a182a4ca4d 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java @@ -156,7 +156,7 @@ protected void nextBlock() { public void flushDataInternal() { log.info( "Close current parquet record writer, write data size:[{}]", - SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue())); + SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue() - lastWriteSize)); try { if (writer != null) { writer.close(); diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java index a35a733a33..2b54cf2912 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java @@ -77,7 +77,7 @@ protected void nextBlock() { public void flushDataInternal() { log.info( "Close current text stream, write data size:[{}]", - SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue())); + SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue() - lastWriteSize)); try { if (stream != null) { diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java b/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java index 2f84d08fc6..0e65501a68 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java @@ -58,6 +58,7 @@ public abstract class BaseFileOutputFormat extends BaseRichOutputFormat { protected List preCommitFilePathList = new ArrayList<>(); protected long nextNumForCheckDataSize; protected long lastWriteTime = System.currentTimeMillis(); + protected long lastWriteSize; @Override public void initializeGlobal(int parallelism) { @@ -150,6 +151,7 @@ public void flushData() { if (rowsOfCurrentBlock != 0) { flushDataInternal(); sumRowsOfBlock += rowsOfCurrentBlock; + lastWriteSize = bytesWriteCounter.getLocalValue(); log.info( "flush file:{}, rowsOfCurrentBlock = {}, sumRowsOfBlock = {}", currentFileName, diff --git a/chunjun-local-test/pom.xml b/chunjun-local-test/pom.xml index e476e2142d..22f564d18b 100644 --- a/chunjun-local-test/pom.xml +++ b/chunjun-local-test/pom.xml @@ -51,58 +51,71 @@ chunjun-connector-stream ${project.version} - - com.dtstack.chunjun - chunjun-connector-binlog - ${project.version} - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - com.dtstack.chunjun - chunjun-connector-iceberg - ${project.version} - - - com.dtstack.chunjun - chunjun-connector-mongodb - ${project.version} - - - - com.dtstack.chunjun - chunjun-connector-emqx - ${project.version} - - - com.dtstack.chunjun - chunjun-connector-jdbc-base - ${project.version} - - - - com.dtstack.chunjun - chunjun-connector-mysql - ${project.version} - - - - com.dtstack.chunjun - chunjun-connector-kafka - ${project.version} - - - - com.dtstack.chunjun - chunjun-connector-kingbase - ${project.version} - - - - - com.dtstack.chunjun - chunjun-connector-ftp - ${project.version} - org.apache.flink @@ -110,12 +123,6 @@ ${flink.version} - - com.dtstack.chunjun - chunjun-connector-oceanbase - master - - org.apache.avro avro