Skip to content

Commit

Permalink
[hotfix-DTStack#1821][hdfs]Modify orc file calculation method
Browse files Browse the repository at this point in the history
  • Loading branch information
OT-XY committed Sep 21, 2023
1 parent 45adcc0 commit ace6540
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected void openSource() {
@Override
// todo the deviation needs to be calculated accurately
protected long getCurrentFileSize() {
return (long) (bytesWriteCounter.getLocalValue() * getDeviation());
return (long) ((bytesWriteCounter.getLocalValue() - lastWriteSize) * getDeviation());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected void nextBlock() {
public void flushDataInternal() {
log.info(
"Close current parquet record writer, write data size:[{}]",
SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue()));
SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue() - lastWriteSize));
try {
if (writer != null) {
writer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void nextBlock() {
public void flushDataInternal() {
log.info(
"Close current text stream, write data size:[{}]",
SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue()));
SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue() - lastWriteSize));

try {
if (stream != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public abstract class BaseFileOutputFormat extends BaseRichOutputFormat {
protected List<String> preCommitFilePathList = new ArrayList<>();
protected long nextNumForCheckDataSize;
protected long lastWriteTime = System.currentTimeMillis();
protected long lastWriteSize;

@Override
public void initializeGlobal(int parallelism) {
Expand Down Expand Up @@ -150,6 +151,7 @@ public void flushData() {
if (rowsOfCurrentBlock != 0) {
flushDataInternal();
sumRowsOfBlock += rowsOfCurrentBlock;
lastWriteSize = bytesWriteCounter.getLocalValue();
log.info(
"flush file:{}, rowsOfCurrentBlock = {}, sumRowsOfBlock = {}",
currentFileName,
Expand Down
121 changes: 64 additions & 57 deletions chunjun-local-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,71 +51,78 @@
<artifactId>chunjun-connector-stream</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-binlog</artifactId>
<version>${project.version}</version>
</dependency>
<!--rdb test start-->
<!-- <dependency>-->
<!-- <groupId>com.dtstack.chunjun</groupId>-->
<!-- <artifactId>chunjun-connector-mysql</artifactId>-->
<!-- <version>${project.version}</version>-->
<!-- </dependency>-->
<!--rdb test end-->

<!--hdfs test start-->
<!-- <dependency>-->
<!-- <groupId>com.dtstack.chunjun</groupId>-->
<!-- <artifactId>chunjun-connector-hdfs</artifactId>-->
<!-- <version>${project.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <artifactId>hadoop-mapreduce-client-core</artifactId>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>commons-cli</artifactId>-->
<!-- <groupId>commons-cli</groupId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <artifactId>snappy-java</artifactId>-->
<!-- <groupId>org.xerial.snappy</groupId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <version>2.7.5</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <artifactId>hadoop-common</artifactId>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>gson</artifactId>-->
<!-- <groupId>com.google.code.gson</groupId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <artifactId>commons-cli</artifactId>-->
<!-- <groupId>commons-cli</groupId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <artifactId>snappy-java</artifactId>-->
<!-- <groupId>org.xerial.snappy</groupId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <version>2.7.5</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <artifactId>hadoop-hdfs</artifactId>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>commons-cli</artifactId>-->
<!-- <groupId>commons-cli</groupId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <artifactId>netty-all</artifactId>-->
<!-- <groupId>io.netty</groupId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <version>2.7.5</version>-->
<!-- </dependency>-->
<!--hdfs test end-->

<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-iceberg</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-mongodb</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-emqx</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-jdbc-base</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-mysql</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-kafka</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-kingbase</artifactId>
<version>${project.version}</version>
</dependency>

<!-- -->
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-ftp</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-oceanbase</artifactId>
<version>master</version>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down

0 comments on commit ace6540

Please sign in to comment.