diff --git a/jcommon/ai/google/src/test/java/run/mone/ai/google/test/ClientTest.java b/jcommon/ai/google/src/test/java/run/mone/ai/google/test/ClientTest.java index 575d843bf..8d8f74d68 100644 --- a/jcommon/ai/google/src/test/java/run/mone/ai/google/test/ClientTest.java +++ b/jcommon/ai/google/src/test/java/run/mone/ai/google/test/ClientTest.java @@ -22,7 +22,7 @@ public void test1() { RequestPayload payload = RequestPayload.builder().maxTokens(4000).anthropicVersion("vertex-2023-10-16").messages(Lists.newArrayList(Message.builder().role("user") .content(content) .build())).build(); - ResponsePayload r = c.call(c.token(), payload); + ResponsePayload r = c.call(c.token(""), payload); System.out.println(r.getContent().get(0).getText()); } } diff --git a/jcommon/es/pom.xml b/jcommon/es/pom.xml index 8312ca347..20d64f314 100644 --- a/jcommon/es/pom.xml +++ b/jcommon/es/pom.xml @@ -1,91 +1,75 @@ - - 4.0.0 - - run.mone - jcommon - 1.6.0-jdk21-SNAPSHOT - - es - 1.5-jdk21-SNAPSHOT - - - org.elasticsearch.client - elasticsearch-rest-high-level-client - 7.10.0 - - - httpclient - org.apache.httpcomponents - - - httpcore - org.apache.httpcomponents - - - httpcore-nio - org.apache.httpcomponents - - - - - org.elasticsearch.client - elasticsearch-rest-client-sniffer - 7.10.0 - - - org.apache.httpcomponents - httpclient - 4.5.12 - - - org.apache.httpcomponents - httpcore-nio - 4.4.13 - - - org.apache.httpcomponents - httpcore - 4.4.13 - - - run.mone - nacos - 1.4-v1-jdk20-SNAPSHOT - test - - - - - - maven-compiler-plugin - 3.11.0 - - 21 - 21 - true - UTF-8 - - ${project.basedir}/src/main/java - - - - - maven-source-plugin - 2.1 - - - compile - - jar - - - - - true - - - - + + 4.0.0 + + run.mone + jcommon + 1.6.0-jdk21-SNAPSHOT + + es + 1.7-jdk8-SNAPSHOT + + + UTF-8 + 7.17.21 + + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + ${es.version} + + + org.elasticsearch.client + elasticsearch-rest-client-sniffer + ${es.version} + + + org.elasticsearch + elasticsearch + ${es.version} + + + + run.mone + nacos + 1.4-v1-jdk20-SNAPSHOT + test + + + + + + maven-compiler-plugin + 3.11.0 + + 8 + 8 + true + UTF-8 + + ${project.basedir}/src/main/java + + + + + maven-source-plugin + 2.1 + + + compile + + jar + + + + + true + + + + diff --git a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java index e2b145d2f..764e1b8cd 100644 --- a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java +++ b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java @@ -25,18 +25,14 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.*; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.client.indices.*; import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer; import org.elasticsearch.client.sniff.SniffOnFailureListener; import org.elasticsearch.client.sniff.Sniffer; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -47,6 +43,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xcontent.XContentType; import java.io.IOException; import java.util.*; @@ -186,7 +183,9 @@ private RestClientBuilder createRestClientBuilder(List hosts, Header[] } private void initializeHighLevelClient(RestClientBuilder clientBuilder) { - this.client = new RestHighLevelClient(clientBuilder); + this.client = new RestHighLevelClientBuilder(clientBuilder.build()) + .setApiCompatibilityMode(true) + .build(); this.restClient = client.getLowLevelClient(); } diff --git a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsProcessor.java b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsProcessor.java index 7c9d7e71d..73e04962c 100644 --- a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsProcessor.java +++ b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsProcessor.java @@ -9,7 +9,7 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.TimeValue; import java.util.Map; import java.util.concurrent.TimeUnit; diff --git a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java index bc9b5a24f..24e4d3e84 100644 --- a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java +++ b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java @@ -14,7 +14,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.indices.GetMappingsResponse; import org.elasticsearch.client.indices.IndexTemplatesExistRequest; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; diff --git a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java index a6a7fb69b..12a50198f 100644 --- a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java +++ b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java @@ -1,69 +1,79 @@ -package com.xiaomi.mone.es.test; - -import com.xiaomi.data.push.nacos.NacosConfig; -import com.xiaomi.mone.es.EsProcessor; -import com.xiaomi.mone.es.EsClient; -import com.xiaomi.mone.es.ProcessorConf; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.junit.Test; - -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -public class EsProcessorClientTest { - - @Test - public void bulkInsert() throws InterruptedException { - - NacosConfig config = new NacosConfig(); - config.setDataId("zzy_new"); -// config.init(); - - String ip = config.getConfig("es_ip"); - String user = config.getConfig("es_user"); - String pwd = config.getConfig("es_password"); - ProcessorConf conf = new ProcessorConf(100, 5, 1, 100, 3, 5, new EsClient(ip, user, pwd), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - System.out.println("before insert" + request); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - System.out.println("success after,request:" + request.getDescription() + " resopnse:" + Arrays.toString(response.getItems())); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - System.out.println("success after,request:" + request + " failure:" + failure); - } - }); - EsProcessor processor = new EsProcessor(conf); - try { - String indexName = "zgq_common_milog_staging_free_private_1-" + new SimpleDateFormat("yyyy.MM.dd").format(new Date()); - Map data = new HashMap<>(); - data.put("timestamp", System.currentTimeMillis()); - data.put("filename", "/home/work/log/log-manager/server.log1"); - int n = 1; - int count = 0; - while (true) { +//package com.xiaomi.mone.es.test; +// +//import com.google.common.reflect.TypeToken; +//import com.google.gson.Gson; +//import com.xiaomi.data.push.nacos.NacosConfig; +//import com.xiaomi.mone.es.EsClient; +//import com.xiaomi.mone.es.EsProcessor; +//import com.xiaomi.mone.es.ProcessorConf; +//import org.elasticsearch.action.bulk.BulkProcessor; +//import org.elasticsearch.action.bulk.BulkRequest; +//import org.elasticsearch.action.bulk.BulkResponse; +//import org.junit.Test; +// +//import java.text.SimpleDateFormat; +//import java.time.Instant; +//import java.util.Arrays; +//import java.util.Date; +//import java.util.HashMap; +//import java.util.Map; +// +//public class EsProcessorClientTest { +// +// @Test +// public void bulkInsert() throws InterruptedException { +// +// String str = "{\"@timestamp\":\"2024-06-20T19:39:15.871+08:00\",\"@version\":\"1\",\"message\":\"hello world data test wtt~\",\"logger_name\":\"com.xiaomi.ai.Application\",\"thread_name\":\"http-nio-10010-exec-3\",\"level\":\"INFO\",\"level_value\":20000,\"LOG_NAME\":\"ai-workflow\",\"SENTRY_ENABLED\":\"false\",\"user_name\":\"wangjunfei3\",\"user_team\":\"ncl7150\",\"request_uri\":\"/hello\",\"trace_id\":\"9cf73bfe51e877a83806ac01b6630815\",\"trace_flags\":\"01\",\"span_id\":\"c13434f908acebb4\"}"; +// Map data = new Gson().fromJson(str, new TypeToken>() { +// }.getType()); +// data.put("timeStamp", System.currentTimeMillis()); +// +// NacosConfig config = new NacosConfig(); +// config.setDataId("zzy_new"); +//// config.init(); +// +// String ip = "zjydw.api.es.srv:80"; +// String user = config.getConfig("es_user"); +// String pwd = config.getConfig("es_password"); +// String token = "4244b7014a5c44fea63bea711c7697fe"; +// String catalog = "es_zjy_log"; +// String database = "default"; +// +// EsClient esClient = new EsClient(ip, token, catalog, database); +// ProcessorConf conf = new ProcessorConf(100, 5, 1, 100, 3, 5, esClient, new BulkProcessor.Listener() { +// @Override +// public void beforeBulk(long executionId, BulkRequest request) { +// System.out.println("before insert" + request); +// } +// +// @Override +// public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { +// System.out.println("success after,request:" + request.getDescription() + " resopnse:" + Arrays.toString(response.getItems())); +// } +// +// @Override +// public void afterBulk(long executionId, BulkRequest request, Throwable failure) { +// System.out.println("success after,request:" + request + " failure:" + failure); +// } +// }); +// EsProcessor processor = new EsProcessor(conf); +// try { +// String indexName = "prod_hera_index_95956-" + new SimpleDateFormat("yyyy.MM.dd").format(new Date()); +// int n = 1; +// int count = 0; +// while (true) { +//// processor.bulkInsert(indexName, data); // processor.bulkInsert(indexName, data); - processor.bulkInsert(indexName, data); - count++; - if (count == n) { - break; - } - } -// Thread.sleep(10000l); - System.in.read(); - }catch (Exception e){ - e.printStackTrace(); - } - - } -} \ No newline at end of file +// count++; +// if (count == n) { +// break; +// } +// } +//// Thread.sleep(10000l); +// System.in.read(); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// +// } +//} \ No newline at end of file diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java index f78f883e6..3889f4f08 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java @@ -8,6 +8,8 @@ */ public interface ILogFile { + int LINE_MAX_LENGTH = 1100000; + void readLine() throws IOException; void setStop(boolean stop); diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java index 71654948e..ddc7878a7 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java @@ -52,7 +52,7 @@ public class LogFile implements ILogFile { private String md5; - private static final int LINE_MAX_LENGTH = 50000; + // private static final int LINE_MAX_LENGTH = 50000; public LogFile() { diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java index e77743c3a..fbbd399c4 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java @@ -58,8 +58,6 @@ public class LogFile2 implements ILogFile { private String md5; - private static final int LINE_MAX_LENGTH = 50000; - public LogFile2() { } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFileWS.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFileWS.java index 08043057b..8b2ade870 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFileWS.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFileWS.java @@ -40,8 +40,6 @@ public class LogFileWS extends LogFile { private String md5; - private static final int LINE_MAX_LENGTH = 50000; - private WatchService ws; { diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/MLog.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/MLog.java index 0f96fa63d..6e5bbc6f4 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/MLog.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/MLog.java @@ -53,9 +53,9 @@ public class MLog { // private static final Pattern ERROR_LINE_PATTERN = Pattern.compile(".*ERROR|.*(WARN|INFO).*(Exception|exception|error|Profiler)"); // private static final int MAX_ERROR_LINE_MATCH_LENGTH = 300; /** - * 最多聚合200行错误栈,避免queue无限增长 + * 最多聚合800行错误栈,避免queue无限增长 */ - private static final int MAX_MERGE_LINE = 400; + private static final int MAX_MERGE_LINE = 800; @Deprecated public List append(String msg) { diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java index 968a5a83a..b065fb0db 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java @@ -78,9 +78,12 @@ public void load() { } } - @SneakyThrows public void load(String filePath) { - this.filePath = filePath; - this.load(); + try { + this.filePath = filePath; + this.load(); + } catch (Exception e) { + log.error("load cache error,filePath:{}", filePath, e); + } } } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java index 860e6537c..fb35d1cfc 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java @@ -8,4 +8,7 @@ public interface EventListener { void onEvent(FileEvent event); + default void remove(Object fileKey) { + } + } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java index 5347cd10f..f7a855994 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java @@ -8,11 +8,11 @@ import com.xiaomi.mone.file.event.EventType; import com.xiaomi.mone.file.event.FileEvent; import com.xiaomi.mone.file.ozhera.HeraFileMonitor; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @@ -28,8 +28,7 @@ public class DefaultMonitorListener implements EventListener { private Consumer consumer; - @Getter - private List readListenerList = new CopyOnWriteArrayList<>(); + private Map readListenerMap = new ConcurrentHashMap<>(); private ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor(); @@ -44,7 +43,7 @@ public void onEvent(FileEvent event) { log.info("log file:{}", event.getFileName()); LogFile2 logFile = new LogFile2(event.getFileName()); OzHeraReadListener ozHeraReadListener = new OzHeraReadListener(monitor, logFile, consumer); - readListenerList.add(ozHeraReadListener); + readListenerMap.put(event.getFileKey(), ozHeraReadListener); pool.submit(() -> { logFile.setListener(ozHeraReadListener); SafeRun.run(logFile::readLine); @@ -58,6 +57,7 @@ public void onEvent(FileEvent event) { if (event.getType().equals(EventType.delete)) { log.info("delete:{}", event.getFileName()); + readListenerMap.remove(event.getFileKey()); } if (event.getType().equals(EventType.empty)) { @@ -71,11 +71,21 @@ public void onEvent(FileEvent event) { // LogFile2 logFile = new LogFile2(event.getFileName()); LogFile2 logFile = new LogFile2(event.getFileName(), 0, 0); OzHeraReadListener ozHeraReadListener = new OzHeraReadListener(monitor, logFile, consumer); - readListenerList.add(ozHeraReadListener); + readListenerMap.put(event.getFileKey(), ozHeraReadListener); + pool.submit(() -> { logFile.setListener(ozHeraReadListener); SafeRun.run(logFile::readLine); }); } } + + @Override + public void remove(Object fileKey) { + readListenerMap.remove(fileKey); + } + + public List getReadListenerList() { + return this.readListenerMap.values().stream().toList(); + } } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java index f7add647a..c8a2d1e84 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java @@ -39,6 +39,9 @@ public class HeraFileMonitor { @Setter private EventListener listener; + @Setter + private volatile boolean stop; + public HeraFileMonitor() { this(TimeUnit.SECONDS.toMillis(30)); } @@ -53,10 +56,12 @@ public HeraFileMonitor(long removeTime) { remList.add(Pair.of(it.getFileName(), it.getFileKey())); } }); + remList.forEach(it -> { log.info("remove file:{}", it.getKey()); fileMap.remove(it.getKey()); map.remove(it.getValue()); + listener.remove(it.getValue()); }); } catch (Throwable ex) { log.error(ex.getMessage(), ex); @@ -85,6 +90,7 @@ public void reg(String path, Predicate predicate) throws IOException, In directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_CREATE); while (true) { try { + WatchKey key = watchService.take(); try { for (WatchEvent event : key.pollEvents()) { diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java index 8644d3e01..d7b987194 100644 --- a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java @@ -78,7 +78,7 @@ public void testLogFileMonitor() { Pattern pattern = Pattern.compile(fileName); monitor.reg("/home/work/log", it -> { boolean matches = pattern.matcher(it).matches(); - log.info("file:{},matches:{}", it, matches); + log.info("file:{},matches:{}", it, true); return true; }); log.info("reg finish");