From 1f2dae8a544d468262036a4683c5ddda3ff3a621 Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Tue, 11 Jun 2024 16:42:09 +0800 Subject: [PATCH 1/4] refactor: update es and upgrade file --- .../java/com/xiaomi/mone/es/EsClient.java | 6 +++--- .../com/xiaomi/mone/es/test/EsClientTest.java | 5 +++-- .../mone/es/test/EsProcessorClientTest.java | 7 ++++--- .../java/com/xiaomi/mone/file/ILogFile.java | 7 +++++++ .../java/com/xiaomi/mone/file/LogFile.java | 12 +++++++++++ .../java/com/xiaomi/mone/file/LogFile2.java | 20 ++++++++++++++++--- .../com/xiaomi/mone/file/LogFileTest.java | 6 +++--- 7 files changed, 49 insertions(+), 14 deletions(-) diff --git a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java index f557c4d83..e2b145d2f 100644 --- a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java +++ b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java @@ -58,8 +58,8 @@ public class EsClient { private static Sniffer sniffer; - private static final int SNIFF_INTERVAL_MILLIS = 30 * 1000; - private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 30 * 1000; + private static final int SNIFF_INTERVAL_MILLIS = 60 * 1000 * 3; + private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 60 * 1000; private static final int MAX_CONN_PER_ROUTE = 500; private static final int MAX_CONN_TOTAL = 500; private static final int SOCKET_TIMEOUT_MS = 10 * 60 * 1000; @@ -196,7 +196,7 @@ private void initializeSniffer() { .setSniffAfterFailureDelayMillis(SNIFF_AFTER_FAILURE_DELAY_MILLIS) .setNodesSniffer(new ElasticsearchNodesSniffer( restClient, - TimeUnit.SECONDS.toMillis(5), + TimeUnit.SECONDS.toMillis(60), ElasticsearchNodesSniffer.Scheme.HTTP)) .build(); sniffOnFailureListener.setSniffer(sniffer); diff --git a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java index 97ef93055..bc9b5a24f 100644 --- a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java +++ b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java @@ -226,8 +226,9 @@ public void getClusterHealth() throws IOException { @Test public void queryIndexMetadataTest() throws IOException { - GetMappingsResponse metadata = client.queryIndexMapping("zgq_common_milog_staging_app_private_1"); - System.out.println(String.format("result:%s", gson.toJson(metadata))); + GetMappingsResponse metadata = client.queryIndexMapping("test_scf_log_index"); +// Map mappings = metadata.mappings(); +// System.out.println(String.format("result:%s", gson.toJson(metadata))); } @Test diff --git a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java index 7653a6d56..a6a7fb69b 100644 --- a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java +++ b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java @@ -22,7 +22,7 @@ public void bulkInsert() throws InterruptedException { NacosConfig config = new NacosConfig(); config.setDataId("zzy_new"); - config.init(); +// config.init(); String ip = config.getConfig("es_ip"); String user = config.getConfig("es_user"); @@ -53,13 +53,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) int count = 0; while (true) { // processor.bulkInsert(indexName, data); - processor.bulkUpsert(indexName, "YpzPE4UBt3Uy5NFQ1V5e", data); + processor.bulkInsert(indexName, data); count++; if (count == n) { break; } } - Thread.sleep(10000l); +// Thread.sleep(10000l); + System.in.read(); }catch (Exception e){ e.printStackTrace(); } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java index 0a898721e..f78f883e6 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java @@ -16,5 +16,12 @@ public interface ILogFile { void initLogFile(String file, ReadListener listener, long pointer, long lineNumber); + /** + * It only needs to be called when an exception occurs and can only be called externally. + */ + void setExceptionFinish(); + + boolean getExceptionFinish(); + } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java index 5f8b82b61..71654948e 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java @@ -35,6 +35,8 @@ public class LogFile implements ILogFile { @Setter private volatile boolean reFresh; + private volatile boolean exceptionFinish; + @Getter private int beforePointerHashCode; @@ -171,6 +173,16 @@ public void initLogFile(String file, ReadListener listener, long pointer, long l this.lineNumber = lineNumber; } + @Override + public void setExceptionFinish() { + exceptionFinish = true; + } + + @Override + public boolean getExceptionFinish() { + return exceptionFinish; + } + private String lineCutOff(String line) { if (null != line) { //todo 大行文件先临时截断 diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java index 27a921211..e77743c3a 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java @@ -42,6 +42,8 @@ public class LogFile2 implements ILogFile { @Setter private volatile boolean reFresh; + private volatile boolean exceptionFinish; + @Getter private int beforePointerHashCode; @@ -110,12 +112,13 @@ private void open() { } } + @Override public void readLine() throws IOException { while (true) { open(); //兼容文件切换时,缓存的pointer try { - log.info("open file:{},pointer:{}", file, this.pointer); + log.info("open file:{},pointer:{},lineNumber:{},", file, this.pointer, this.lineNumber); if (pointer > raf.length()) { pointer = 0; lineNumber = 0; @@ -123,7 +126,7 @@ public void readLine() throws IOException { } catch (Exception e) { log.error("file.length() IOException, file:{}", this.file, e); } - log.info("rel open file:{},pointer:{}", file, this.pointer); + log.info("rel open file:{},pointer:{},lineNumber:{}", file, this.pointer, this.lineNumber); raf.seek(pointer); while (true) { @@ -166,6 +169,7 @@ public void readLine() throws IOException { } if (listener.isBreak(line)) { + log.info("isBreak:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, this.fileKey); stop = true; break; } @@ -193,7 +197,7 @@ public void readLine() throws IOException { } raf.close(); if (stop) { - log.info("stop:{},pointer:{},fileKey:{}", this.file, this.pointer, this.fileKey); + log.info("stop:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, this.fileKey); FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).fileName(this.file).build()); break; } @@ -209,6 +213,16 @@ public void initLogFile(String file, ReadListener listener, long pointer, long l this.lineNumber = lineNumber; } + @Override + public void setExceptionFinish() { + exceptionFinish = true; + } + + @Override + public boolean getExceptionFinish() { + return exceptionFinish; + } + private String lineCutOff(String line) { if (null != line) { //todo 大行文件先临时截断 diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java index 363f59624..8644d3e01 100644 --- a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java @@ -74,12 +74,12 @@ public void testLogFileMonitor() { monitor.setListener(new DefaultMonitorListener(monitor, readEvent -> { System.out.println(readEvent.getReadResult().getLines()); })); - String fileName = "/home/work/log/test/provider/server.log.*"; + String fileName = "/home/work/log/file.log.*"; Pattern pattern = Pattern.compile(fileName); - monitor.reg("/home/work/log/test/provider/", it -> { + monitor.reg("/home/work/log", it -> { boolean matches = pattern.matcher(it).matches(); log.info("file:{},matches:{}", it, matches); - return matches; + return true; }); log.info("reg finish"); System.in.read(); From 410775fb20e6de5f1a500689b24462c1e98bb384 Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Wed, 14 Aug 2024 14:47:53 +0800 Subject: [PATCH 2/4] fix: Fixed the issue of reading a large number of files OOM and upgraded the ES version --- .../run/mone/ai/google/test/ClientTest.java | 2 +- jcommon/es/pom.xml | 162 ++++++++---------- .../java/com/xiaomi/mone/es/EsClient.java | 13 +- .../java/com/xiaomi/mone/es/EsProcessor.java | 2 +- .../com/xiaomi/mone/es/test/EsClientTest.java | 2 +- .../mone/es/test/EsProcessorClientTest.java | 146 ++++++++-------- .../java/com/xiaomi/mone/file/ILogFile.java | 2 + .../java/com/xiaomi/mone/file/LogFile.java | 2 +- .../java/com/xiaomi/mone/file/LogFile2.java | 2 - .../java/com/xiaomi/mone/file/LogFileWS.java | 2 - .../main/java/com/xiaomi/mone/file/MLog.java | 4 +- .../mone/file/common/FileInfoCache.java | 9 +- .../xiaomi/mone/file/event/EventListener.java | 3 + .../file/listener/DefaultMonitorListener.java | 22 ++- .../mone/file/ozhera/HeraFileMonitor.java | 6 + .../com/xiaomi/mone/file/LogFileTest.java | 2 +- 16 files changed, 197 insertions(+), 184 deletions(-) diff --git a/jcommon/ai/google/src/test/java/run/mone/ai/google/test/ClientTest.java b/jcommon/ai/google/src/test/java/run/mone/ai/google/test/ClientTest.java index 575d843bf..8d8f74d68 100644 --- a/jcommon/ai/google/src/test/java/run/mone/ai/google/test/ClientTest.java +++ b/jcommon/ai/google/src/test/java/run/mone/ai/google/test/ClientTest.java @@ -22,7 +22,7 @@ public void test1() { RequestPayload payload = RequestPayload.builder().maxTokens(4000).anthropicVersion("vertex-2023-10-16").messages(Lists.newArrayList(Message.builder().role("user") .content(content) .build())).build(); - ResponsePayload r = c.call(c.token(), payload); + ResponsePayload r = c.call(c.token(""), payload); System.out.println(r.getContent().get(0).getText()); } } diff --git a/jcommon/es/pom.xml b/jcommon/es/pom.xml index 8312ca347..20d64f314 100644 --- a/jcommon/es/pom.xml +++ b/jcommon/es/pom.xml @@ -1,91 +1,75 @@ - - 4.0.0 - - run.mone - jcommon - 1.6.0-jdk21-SNAPSHOT - - es - 1.5-jdk21-SNAPSHOT - - - org.elasticsearch.client - elasticsearch-rest-high-level-client - 7.10.0 - - - httpclient - org.apache.httpcomponents - - - httpcore - org.apache.httpcomponents - - - httpcore-nio - org.apache.httpcomponents - - - - - org.elasticsearch.client - elasticsearch-rest-client-sniffer - 7.10.0 - - - org.apache.httpcomponents - httpclient - 4.5.12 - - - org.apache.httpcomponents - httpcore-nio - 4.4.13 - - - org.apache.httpcomponents - httpcore - 4.4.13 - - - run.mone - nacos - 1.4-v1-jdk20-SNAPSHOT - test - - - - - - maven-compiler-plugin - 3.11.0 - - 21 - 21 - true - UTF-8 - - ${project.basedir}/src/main/java - - - - - maven-source-plugin - 2.1 - - - compile - - jar - - - - - true - - - - + + 4.0.0 + + run.mone + jcommon + 1.6.0-jdk21-SNAPSHOT + + es + 1.7-jdk8-SNAPSHOT + + + UTF-8 + 7.17.21 + + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + ${es.version} + + + org.elasticsearch.client + elasticsearch-rest-client-sniffer + ${es.version} + + + org.elasticsearch + elasticsearch + ${es.version} + + + + run.mone + nacos + 1.4-v1-jdk20-SNAPSHOT + test + + + + + + maven-compiler-plugin + 3.11.0 + + 8 + 8 + true + UTF-8 + + ${project.basedir}/src/main/java + + + + + maven-source-plugin + 2.1 + + + compile + + jar + + + + + true + + + + diff --git a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java index e2b145d2f..764e1b8cd 100644 --- a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java +++ b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java @@ -25,18 +25,14 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.*; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.client.indices.*; import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer; import org.elasticsearch.client.sniff.SniffOnFailureListener; import org.elasticsearch.client.sniff.Sniffer; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -47,6 +43,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xcontent.XContentType; import java.io.IOException; import java.util.*; @@ -186,7 +183,9 @@ private RestClientBuilder createRestClientBuilder(List hosts, Header[] } private void initializeHighLevelClient(RestClientBuilder clientBuilder) { - this.client = new RestHighLevelClient(clientBuilder); + this.client = new RestHighLevelClientBuilder(clientBuilder.build()) + .setApiCompatibilityMode(true) + .build(); this.restClient = client.getLowLevelClient(); } diff --git a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsProcessor.java b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsProcessor.java index 7c9d7e71d..73e04962c 100644 --- a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsProcessor.java +++ b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsProcessor.java @@ -9,7 +9,7 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.TimeValue; import java.util.Map; import java.util.concurrent.TimeUnit; diff --git a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java index bc9b5a24f..24e4d3e84 100644 --- a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java +++ b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java @@ -14,7 +14,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.indices.GetMappingsResponse; import org.elasticsearch.client.indices.IndexTemplatesExistRequest; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; diff --git a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java index a6a7fb69b..12a50198f 100644 --- a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java +++ b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java @@ -1,69 +1,79 @@ -package com.xiaomi.mone.es.test; - -import com.xiaomi.data.push.nacos.NacosConfig; -import com.xiaomi.mone.es.EsProcessor; -import com.xiaomi.mone.es.EsClient; -import com.xiaomi.mone.es.ProcessorConf; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.junit.Test; - -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -public class EsProcessorClientTest { - - @Test - public void bulkInsert() throws InterruptedException { - - NacosConfig config = new NacosConfig(); - config.setDataId("zzy_new"); -// config.init(); - - String ip = config.getConfig("es_ip"); - String user = config.getConfig("es_user"); - String pwd = config.getConfig("es_password"); - ProcessorConf conf = new ProcessorConf(100, 5, 1, 100, 3, 5, new EsClient(ip, user, pwd), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - System.out.println("before insert" + request); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - System.out.println("success after,request:" + request.getDescription() + " resopnse:" + Arrays.toString(response.getItems())); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - System.out.println("success after,request:" + request + " failure:" + failure); - } - }); - EsProcessor processor = new EsProcessor(conf); - try { - String indexName = "zgq_common_milog_staging_free_private_1-" + new SimpleDateFormat("yyyy.MM.dd").format(new Date()); - Map data = new HashMap<>(); - data.put("timestamp", System.currentTimeMillis()); - data.put("filename", "/home/work/log/log-manager/server.log1"); - int n = 1; - int count = 0; - while (true) { +//package com.xiaomi.mone.es.test; +// +//import com.google.common.reflect.TypeToken; +//import com.google.gson.Gson; +//import com.xiaomi.data.push.nacos.NacosConfig; +//import com.xiaomi.mone.es.EsClient; +//import com.xiaomi.mone.es.EsProcessor; +//import com.xiaomi.mone.es.ProcessorConf; +//import org.elasticsearch.action.bulk.BulkProcessor; +//import org.elasticsearch.action.bulk.BulkRequest; +//import org.elasticsearch.action.bulk.BulkResponse; +//import org.junit.Test; +// +//import java.text.SimpleDateFormat; +//import java.time.Instant; +//import java.util.Arrays; +//import java.util.Date; +//import java.util.HashMap; +//import java.util.Map; +// +//public class EsProcessorClientTest { +// +// @Test +// public void bulkInsert() throws InterruptedException { +// +// String str = "{\"@timestamp\":\"2024-06-20T19:39:15.871+08:00\",\"@version\":\"1\",\"message\":\"hello world data test wtt~\",\"logger_name\":\"com.xiaomi.ai.Application\",\"thread_name\":\"http-nio-10010-exec-3\",\"level\":\"INFO\",\"level_value\":20000,\"LOG_NAME\":\"ai-workflow\",\"SENTRY_ENABLED\":\"false\",\"user_name\":\"wangjunfei3\",\"user_team\":\"ncl7150\",\"request_uri\":\"/hello\",\"trace_id\":\"9cf73bfe51e877a83806ac01b6630815\",\"trace_flags\":\"01\",\"span_id\":\"c13434f908acebb4\"}"; +// Map data = new Gson().fromJson(str, new TypeToken>() { +// }.getType()); +// data.put("timeStamp", System.currentTimeMillis()); +// +// NacosConfig config = new NacosConfig(); +// config.setDataId("zzy_new"); +//// config.init(); +// +// String ip = "zjydw.api.es.srv:80"; +// String user = config.getConfig("es_user"); +// String pwd = config.getConfig("es_password"); +// String token = "4244b7014a5c44fea63bea711c7697fe"; +// String catalog = "es_zjy_log"; +// String database = "default"; +// +// EsClient esClient = new EsClient(ip, token, catalog, database); +// ProcessorConf conf = new ProcessorConf(100, 5, 1, 100, 3, 5, esClient, new BulkProcessor.Listener() { +// @Override +// public void beforeBulk(long executionId, BulkRequest request) { +// System.out.println("before insert" + request); +// } +// +// @Override +// public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { +// System.out.println("success after,request:" + request.getDescription() + " resopnse:" + Arrays.toString(response.getItems())); +// } +// +// @Override +// public void afterBulk(long executionId, BulkRequest request, Throwable failure) { +// System.out.println("success after,request:" + request + " failure:" + failure); +// } +// }); +// EsProcessor processor = new EsProcessor(conf); +// try { +// String indexName = "prod_hera_index_95956-" + new SimpleDateFormat("yyyy.MM.dd").format(new Date()); +// int n = 1; +// int count = 0; +// while (true) { +//// processor.bulkInsert(indexName, data); // processor.bulkInsert(indexName, data); - processor.bulkInsert(indexName, data); - count++; - if (count == n) { - break; - } - } -// Thread.sleep(10000l); - System.in.read(); - }catch (Exception e){ - e.printStackTrace(); - } - - } -} \ No newline at end of file +// count++; +// if (count == n) { +// break; +// } +// } +//// Thread.sleep(10000l); +// System.in.read(); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// +// } +//} \ No newline at end of file diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java index f78f883e6..3889f4f08 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java @@ -8,6 +8,8 @@ */ public interface ILogFile { + int LINE_MAX_LENGTH = 1100000; + void readLine() throws IOException; void setStop(boolean stop); diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java index 71654948e..ddc7878a7 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java @@ -52,7 +52,7 @@ public class LogFile implements ILogFile { private String md5; - private static final int LINE_MAX_LENGTH = 50000; + // private static final int LINE_MAX_LENGTH = 50000; public LogFile() { diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java index e77743c3a..fbbd399c4 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java @@ -58,8 +58,6 @@ public class LogFile2 implements ILogFile { private String md5; - private static final int LINE_MAX_LENGTH = 50000; - public LogFile2() { } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFileWS.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFileWS.java index 08043057b..8b2ade870 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFileWS.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFileWS.java @@ -40,8 +40,6 @@ public class LogFileWS extends LogFile { private String md5; - private static final int LINE_MAX_LENGTH = 50000; - private WatchService ws; { diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/MLog.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/MLog.java index 0f96fa63d..6e5bbc6f4 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/MLog.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/MLog.java @@ -53,9 +53,9 @@ public class MLog { // private static final Pattern ERROR_LINE_PATTERN = Pattern.compile(".*ERROR|.*(WARN|INFO).*(Exception|exception|error|Profiler)"); // private static final int MAX_ERROR_LINE_MATCH_LENGTH = 300; /** - * 最多聚合200行错误栈,避免queue无限增长 + * 最多聚合800行错误栈,避免queue无限增长 */ - private static final int MAX_MERGE_LINE = 400; + private static final int MAX_MERGE_LINE = 800; @Deprecated public List append(String msg) { diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java index 968a5a83a..b065fb0db 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java @@ -78,9 +78,12 @@ public void load() { } } - @SneakyThrows public void load(String filePath) { - this.filePath = filePath; - this.load(); + try { + this.filePath = filePath; + this.load(); + } catch (Exception e) { + log.error("load cache error,filePath:{}", filePath, e); + } } } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java index 860e6537c..fb35d1cfc 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java @@ -8,4 +8,7 @@ public interface EventListener { void onEvent(FileEvent event); + default void remove(Object fileKey) { + } + } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java index 5347cd10f..f7a855994 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java @@ -8,11 +8,11 @@ import com.xiaomi.mone.file.event.EventType; import com.xiaomi.mone.file.event.FileEvent; import com.xiaomi.mone.file.ozhera.HeraFileMonitor; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @@ -28,8 +28,7 @@ public class DefaultMonitorListener implements EventListener { private Consumer consumer; - @Getter - private List readListenerList = new CopyOnWriteArrayList<>(); + private Map readListenerMap = new ConcurrentHashMap<>(); private ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor(); @@ -44,7 +43,7 @@ public void onEvent(FileEvent event) { log.info("log file:{}", event.getFileName()); LogFile2 logFile = new LogFile2(event.getFileName()); OzHeraReadListener ozHeraReadListener = new OzHeraReadListener(monitor, logFile, consumer); - readListenerList.add(ozHeraReadListener); + readListenerMap.put(event.getFileKey(), ozHeraReadListener); pool.submit(() -> { logFile.setListener(ozHeraReadListener); SafeRun.run(logFile::readLine); @@ -58,6 +57,7 @@ public void onEvent(FileEvent event) { if (event.getType().equals(EventType.delete)) { log.info("delete:{}", event.getFileName()); + readListenerMap.remove(event.getFileKey()); } if (event.getType().equals(EventType.empty)) { @@ -71,11 +71,21 @@ public void onEvent(FileEvent event) { // LogFile2 logFile = new LogFile2(event.getFileName()); LogFile2 logFile = new LogFile2(event.getFileName(), 0, 0); OzHeraReadListener ozHeraReadListener = new OzHeraReadListener(monitor, logFile, consumer); - readListenerList.add(ozHeraReadListener); + readListenerMap.put(event.getFileKey(), ozHeraReadListener); + pool.submit(() -> { logFile.setListener(ozHeraReadListener); SafeRun.run(logFile::readLine); }); } } + + @Override + public void remove(Object fileKey) { + readListenerMap.remove(fileKey); + } + + public List getReadListenerList() { + return this.readListenerMap.values().stream().toList(); + } } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java index f7add647a..c8a2d1e84 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java @@ -39,6 +39,9 @@ public class HeraFileMonitor { @Setter private EventListener listener; + @Setter + private volatile boolean stop; + public HeraFileMonitor() { this(TimeUnit.SECONDS.toMillis(30)); } @@ -53,10 +56,12 @@ public HeraFileMonitor(long removeTime) { remList.add(Pair.of(it.getFileName(), it.getFileKey())); } }); + remList.forEach(it -> { log.info("remove file:{}", it.getKey()); fileMap.remove(it.getKey()); map.remove(it.getValue()); + listener.remove(it.getValue()); }); } catch (Throwable ex) { log.error(ex.getMessage(), ex); @@ -85,6 +90,7 @@ public void reg(String path, Predicate predicate) throws IOException, In directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_CREATE); while (true) { try { + WatchKey key = watchService.take(); try { for (WatchEvent event : key.pollEvents()) { diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java index 8644d3e01..d7b987194 100644 --- a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java @@ -78,7 +78,7 @@ public void testLogFileMonitor() { Pattern pattern = Pattern.compile(fileName); monitor.reg("/home/work/log", it -> { boolean matches = pattern.matcher(it).matches(); - log.info("file:{},matches:{}", it, matches); + log.info("file:{},matches:{}", it, true); return true; }); log.info("reg finish"); From 7a6ad16a46d8320e808ef74a0ade62740d5fd818 Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Wed, 14 Aug 2024 14:53:13 +0800 Subject: [PATCH 3/4] refactor: update compile jdk version --- jcommon/es/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jcommon/es/pom.xml b/jcommon/es/pom.xml index 20d64f314..b5a376445 100644 --- a/jcommon/es/pom.xml +++ b/jcommon/es/pom.xml @@ -46,8 +46,8 @@ maven-compiler-plugin 3.11.0 - 8 - 8 + 21 + 21 true UTF-8 From ab9f2ac4cf872380f14f42a06dde8b56031fae1b Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Thu, 29 Aug 2024 19:48:41 +0800 Subject: [PATCH 4/4] refactor: update file coll --- .../java/com/xiaomi/mone/file/LogFile2.java | 1 + .../com/xiaomi/mone/file/ReadListener.java | 3 +++ .../mone/file/listener/OzHeraReadListener.java | 8 ++++++++ .../com/xiaomi/mone/file/ozhera/HeraFile.java | 3 +++ .../mone/file/ozhera/HeraFileMonitor.java | 12 +++++++++--- .../java/com/xiaomi/mone/file/LogFileTest.java | 18 +++++++++++++++--- 6 files changed, 39 insertions(+), 6 deletions(-) diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java index fbbd399c4..0381a698a 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java @@ -190,6 +190,7 @@ public void readLine() throws IOException { readResult.setFilePathName(file); readResult.setLineNumber(++lineNumber); ReadEvent event = new ReadEvent(readResult); + listener.setReadTime(); listener.onEvent(event); } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java index 0329b66bf..75e92b157 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java @@ -37,4 +37,7 @@ default void setPointer(Object obj) { default void saveProgress() { } + default void setReadTime() { + } + } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java index e2b3c2d42..43fa64876 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java @@ -72,4 +72,12 @@ public void setPointer(Object obj) { } } } + + @Override + public void setReadTime() { + HeraFile f = monitor.getFileMap().get(logFile.getFileKey()); + if (null != f) { + f.getReadTime().set(System.currentTimeMillis()); + } + } } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java index 8acb1a989..fbbde66fe 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java @@ -30,4 +30,7 @@ public class HeraFile { @Builder.Default private AtomicLong utime = new AtomicLong(System.currentTimeMillis()); + + @Builder.Default + private AtomicLong readTime = new AtomicLong(System.currentTimeMillis()); } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java index c8a2d1e84..0f7c7d385 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java @@ -34,6 +34,7 @@ public class HeraFileMonitor { @Getter private ConcurrentHashMap map = new ConcurrentHashMap<>(); + @Getter private ConcurrentHashMap fileMap = new ConcurrentHashMap<>(); @Setter @@ -52,7 +53,7 @@ public HeraFileMonitor(long removeTime) { List> remList = Lists.newArrayList(); long now = System.currentTimeMillis(); fileMap.values().forEach(it -> { - if (now - it.getUtime().get() >= removeTime) { + if (now - it.getUtime().get() >= removeTime && now - it.getReadTime().get() >= removeTime) { remList.add(Pair.of(it.getFileName(), it.getFileKey())); } }); @@ -129,7 +130,7 @@ public void reg(String path, Predicate predicate) throws IOException, In map.putIfAbsent(k, hf); fileMap.put(filePath, hf); - listener.onEvent(FileEvent.builder().type(EventType.create).fileName(file.getPath()).build()); + listener.onEvent(FileEvent.builder().type(EventType.create).fileKey(k).fileName(file.getPath()).build()); } } } @@ -167,7 +168,12 @@ private HeraFile initFile(File it) { log.info("initFile fileName:{},fileKey:{}", name, fileKey); map.put(hf.getFileKey(), hf); fileMap.put(hf.getFileName(), hf); - this.listener.onEvent(FileEvent.builder().pointer(pointer).type(EventType.init).fileName(hf.getFileName()).build()); + this.listener.onEvent(FileEvent.builder() + .pointer(pointer) + .type(EventType.init) + .fileName(hf.getFileName()) + .fileKey(hf.getFileKey()) + .build()); return hf; } catch (Exception e) { log.error("init file error,fileName:{}", name, e); diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java index d7b987194..56b6bd782 100644 --- a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java @@ -24,6 +24,8 @@ import org.junit.Test; import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -74,17 +76,27 @@ public void testLogFileMonitor() { monitor.setListener(new DefaultMonitorListener(monitor, readEvent -> { System.out.println(readEvent.getReadResult().getLines()); })); - String fileName = "/home/work/log/file.log.*"; + String fileName = "/home/work/log/test/file*.txt"; Pattern pattern = Pattern.compile(fileName); - monitor.reg("/home/work/log", it -> { + + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + scheduler.scheduleAtFixedRate(this::test, 1, 2, TimeUnit.SECONDS); + + monitor.reg("/home/work/log/test", it -> { boolean matches = pattern.matcher(it).matches(); - log.info("file:{},matches:{}", it, true); + log.info("file:{},matches:{}", it, matches); return true; }); log.info("reg finish"); System.in.read(); } + private void test() { + log.info("test save progress"); + FileInfoCache.ins().shutdown(); + } + @Test public void testLogWS() throws IOException { LogFile log = new LogFile("D:\\test.log", new ReadListener() {