diff --git a/example/src/test/java/io/mycat/xa/BaseSavepointSuite.java b/example/src/test/java/io/mycat/xa/BaseSavepointSuite.java index f85b43e35..c89a6bd4e 100644 --- a/example/src/test/java/io/mycat/xa/BaseSavepointSuite.java +++ b/example/src/test/java/io/mycat/xa/BaseSavepointSuite.java @@ -28,7 +28,7 @@ public void baseSavepointCommit(VertxTestContext testContext) { SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -55,7 +55,7 @@ public void baseSavepointSavepointCommit(VertxTestContext testContext) { SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -99,7 +99,7 @@ public void baseSavepointRollback(VertxTestContext testContext) { SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -120,7 +120,7 @@ public void baseSavepointRelease(VertxTestContext testContext) { SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -142,7 +142,7 @@ public void baseSavepointRollbackSavepoint(VertxTestContext testContext) { SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -168,7 +168,7 @@ public void baseSavepointCommitInSingleConnection(VertxTestContext testContext) SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -215,7 +215,7 @@ public void baseSavepointRollbackInSingleConnection(VertxTestContext testContext SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -262,7 +262,7 @@ public void baseSavepointRollbackSavepointInSingleConnection(VertxTestContext te SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -320,7 +320,7 @@ public void baseSavepointReleaseSavepointInSingleConnection(VertxTestContext tes SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -385,7 +385,7 @@ public void baseSavepointReleaseSavepointInTwoConnection(VertxTestContext testCo SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -479,7 +479,7 @@ public void baseSavepointRollbackSavepointInTwoConnection(VertxTestContext testC SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -570,7 +570,7 @@ public void baseSavepointCommitSavepointInTwoConnection(VertxTestContext testCon SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { @@ -645,7 +645,7 @@ public void baseSavepointRollbackInTwoConnection(VertxTestContext testContext) { SavepointSqlConnection savepointSqlConnection = (SavepointSqlConnection) baseXaSqlConnection; - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override @SneakyThrows public void handle(AsyncResult event) { diff --git a/example/src/test/java/io/mycat/xa/XaTestSuite.java b/example/src/test/java/io/mycat/xa/XaTestSuite.java index d8cd18257..a5237e306 100644 --- a/example/src/test/java/io/mycat/xa/XaTestSuite.java +++ b/example/src/test/java/io/mycat/xa/XaTestSuite.java @@ -91,7 +91,7 @@ private void extracteInitSql(Connection mySQLConnection) throws SQLException { @Test public void begin(VertxTestContext testContext) { XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { Assertions.assertEquals(baseXaSqlConnection.isInTransaction(), true); @@ -104,7 +104,7 @@ public void handle(AsyncResult event) { @Test public void beginCommit(VertxTestContext testContext) { XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.begin(event -> baseXaSqlConnection.commit(new Handler>() { + baseXaSqlConnection.begin().onComplete(event -> baseXaSqlConnection.commit().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { Assertions.assertEquals(baseXaSqlConnection.isInTransaction(), false); @@ -117,7 +117,7 @@ public void handle(AsyncResult event) { @Test public void beginRollback(VertxTestContext testContext) { XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.begin(event -> baseXaSqlConnection.rollback(new Handler>() { + baseXaSqlConnection.begin().onComplete(event -> baseXaSqlConnection.rollback().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { Assertions.assertEquals(baseXaSqlConnection.isInTransaction(), false); @@ -131,10 +131,10 @@ public void handle(AsyncResult event) { @Disabled public void beginBegin(VertxTestContext testContext) { XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { - baseXaSqlConnection.begin(event1 -> { + baseXaSqlConnection.begin().onComplete(event1 -> { Assertions.assertTrue(event1.failed()); testContext.completeNow(); }); @@ -146,7 +146,7 @@ public void handle(AsyncResult event) { @Test public void rollback(VertxTestContext testContext) { XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.rollback(new Handler>() { + baseXaSqlConnection.rollback().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { Assertions.assertTrue(event.succeeded()); @@ -159,7 +159,7 @@ public void handle(AsyncResult event) { @Test public void commit(VertxTestContext testContext) { XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.commit(new Handler>() { + baseXaSqlConnection.commit().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { Assertions.assertTrue(event.succeeded()); @@ -172,7 +172,7 @@ public void handle(AsyncResult event) { @Test public void close(VertxTestContext testContext) { XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.close(new Handler>() { + baseXaSqlConnection.close().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { Assertions.assertTrue(event.succeeded()); @@ -185,10 +185,10 @@ public void handle(AsyncResult event) { @Test public void closeInTranscation(VertxTestContext testContext) { XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.begin(new Handler>() { + baseXaSqlConnection.begin().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { - baseXaSqlConnection.close(new Handler>() { + baseXaSqlConnection.close().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { Assertions.assertTrue(event.succeeded()); @@ -204,7 +204,7 @@ public void handle(AsyncResult event) { public void beginSingleTargetInsertCommit(VertxTestContext testContext) throws Exception { clearData(); XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.begin(event -> { + baseXaSqlConnection.begin().onComplete(event -> { Assertions.assertTrue(event.succeeded()); Future ds1 = baseXaSqlConnection.getConnection("ds1"); ds1.compose(connection -> { @@ -224,7 +224,7 @@ public void beginSingleTargetInsertCommit(VertxTestContext testContext) throws E }); }).onComplete(event13 -> { Assertions.assertTrue(event13.succeeded()); - baseXaSqlConnection.commit(event12 -> { + baseXaSqlConnection.commit().onComplete(event12 -> { Assertions.assertTrue(event12.succeeded()); Assertions.assertFalse(baseXaSqlConnection.isInTransaction()); Future connectionFuture = @@ -247,7 +247,7 @@ public void beginSingleTargetInsertCommit(VertxTestContext testContext) throws E public void beginDoubleTargetInsertCommit(VertxTestContext testContext) throws Exception { clearData(); XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.begin(event -> { + baseXaSqlConnection.begin().onComplete(event -> { Assertions.assertTrue(event.succeeded()); Future ds1 = baseXaSqlConnection.getConnection("ds1"); Future ds2 = baseXaSqlConnection.getConnection("ds2"); @@ -273,7 +273,7 @@ public void beginDoubleTargetInsertCommit(VertxTestContext testContext) throws E })); all.onComplete(event13 -> { Assertions.assertTrue(event13.succeeded()); - baseXaSqlConnection.commit(event12 -> { + baseXaSqlConnection.commit().onComplete(event12 -> { Assertions.assertTrue(event12.succeeded()); Assertions.assertFalse(baseXaSqlConnection.isInTransaction()); Future connectionFuture = @@ -295,7 +295,7 @@ public void beginDoubleTargetInsertCommit(VertxTestContext testContext) throws E public void beginDoubleTargetInsertButStatementFail(VertxTestContext testContext) throws Exception { clearData(); XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.begin(event -> { + baseXaSqlConnection.begin().onComplete(event -> { Assertions.assertTrue(event.succeeded()); Future ds1 = baseXaSqlConnection.getConnection("ds1"); Future ds2 = baseXaSqlConnection.getConnection("ds2"); @@ -321,7 +321,7 @@ public void beginDoubleTargetInsertButStatementFail(VertxTestContext testContext })); all.onComplete(event13 -> { Assertions.assertTrue(event13.failed()); - baseXaSqlConnection.rollback(new Handler>() { + baseXaSqlConnection.rollback().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { Assertions.assertTrue(event.succeeded()); @@ -346,7 +346,7 @@ public void handle(AsyncResult event) { public void beginDoubleTargetInsertButPrepareFail(VertxTestContext testContext) throws Exception { clearData(); XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.begin(event -> { + baseXaSqlConnection.begin().onComplete(event -> { Assertions.assertTrue(event.succeeded()); Future ds1 = baseXaSqlConnection.getConnection("ds1"); Future ds2 = baseXaSqlConnection.getConnection("ds2"); @@ -377,7 +377,7 @@ public void beginDoubleTargetInsertButPrepareFail(VertxTestContext testContext) @Override public void handle(AsyncResult event) { Assertions.assertTrue(event.failed()); - baseXaSqlConnection.rollback(new Handler>() { + baseXaSqlConnection.rollback().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { Assertions.assertTrue(event.succeeded()); @@ -405,7 +405,7 @@ public void handle(AsyncResult event) { public void beginDoubleTargetInsertButCommitFail(VertxTestContext testContext) throws Exception { clearData(); XaSqlConnection baseXaSqlConnection = factory.apply(mySQLManager,xaLog); - baseXaSqlConnection.begin(event -> { + baseXaSqlConnection.begin().onComplete(event -> { Assertions.assertTrue(event.succeeded()); Future ds1 = baseXaSqlConnection.getConnection("ds1"); Future ds2 = baseXaSqlConnection.getConnection("ds2"); @@ -436,7 +436,7 @@ public void beginDoubleTargetInsertButCommitFail(VertxTestContext testContext) t @Override public void handle(AsyncResult event) { Assertions.assertTrue(event.failed()); - baseXaSqlConnection.commit(new Handler>() { + baseXaSqlConnection.commit().onComplete(new Handler>() { @Override public void handle(AsyncResult event) { Assertions.assertTrue(event.succeeded()); diff --git a/mycat2/src/main/java/io/mycat/commands/MycatMySQLManagerImpl.java b/mycat2/src/main/java/io/mycat/commands/MycatMySQLManagerImpl.java index 046acb58a..bf241f7a2 100644 --- a/mycat2/src/main/java/io/mycat/commands/MycatMySQLManagerImpl.java +++ b/mycat2/src/main/java/io/mycat/commands/MycatMySQLManagerImpl.java @@ -151,7 +151,7 @@ public Future> computeConnectionUsageSnapshot() { HashMap resMap = new HashMap<>(); for (Map.Entry entry : map.entrySet()) { MycatDatasourcePool pool = entry.getValue(); - Integer n = pool.getAvailableNumber(); + Integer n = pool.getUsedNumber(); resMap.put(entry.getKey(), n); } return Future.succeededFuture(resMap); diff --git a/mycat2/src/main/java/io/mycat/exporter/ConnectionCounterCollector.java b/mycat2/src/main/java/io/mycat/exporter/ConnectionCounterCollector.java new file mode 100644 index 000000000..1025df116 --- /dev/null +++ b/mycat2/src/main/java/io/mycat/exporter/ConnectionCounterCollector.java @@ -0,0 +1,116 @@ +package io.mycat.exporter; + +import cn.mycat.vertx.xa.MySQLManager; +import com.google.common.collect.ImmutableList; +import io.mycat.MetaClusterCurrent; +import io.mycat.MycatServer; +import io.mycat.datasource.jdbc.datasource.JdbcConnectionManager; +import io.mycat.datasource.jdbc.datasource.JdbcDataSource; +import io.mycat.replica.PhysicsInstance; +import io.mycat.replica.ReplicaSelectorManager; +import io.prometheus.client.Collector; +import io.prometheus.client.GaugeMetricFamily; +import io.vertx.core.Future; +import lombok.Getter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class ConnectionCounterCollector extends Collector { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionCounterCollector.class); + + public ConnectionCounterCollector() { + } + + @Getter + public static enum Type { + TOTAL("total"), + IDLE("idle"), + USE("use"); + + Type(String name) { + this.name = name; + } + + private final String name; + } + + @Override + public List collect() { + try { + return ImmutableList.of(countCurrentClient(), + countCurrentProxyBackend(), + countCurrentJdbcBackend(), + countTotalBackend()); + } catch (Throwable e) { + LOGGER.error("", e); + throw e; + } + } + + private static GaugeMetricFamily countCurrentClient() { + MycatServer mycatServer; + long sum = 0; + if (MetaClusterCurrent.exist(MycatServer.class)) { + mycatServer = MetaClusterCurrent.wrapper(MycatServer.class); + sum = mycatServer.countConnection(); + } + GaugeMetricFamily gaugeMetricFamily = + new GaugeMetricFamily("client_connection", "mycat session or connection", sum); + return gaugeMetricFamily; + } + + private static GaugeMetricFamily countCurrentProxyBackend() { + GaugeMetricFamily gaugeMetricFamily = new GaugeMetricFamily("native_mysql_connection", "mysql naivte backend connection", + ImmutableList.of("type", "datasource")); + + if(MetaClusterCurrent.exist(MySQLManager.class)){ + MySQLManager mySQLManager = MetaClusterCurrent.wrapper(MySQLManager.class); + Future> mapFuture = mySQLManager.computeConnectionUsageSnapshot(); + Map integerMap = Collections.emptyMap(); + try { + integerMap = mapFuture.toCompletionStage().toCompletableFuture().get(1, TimeUnit.SECONDS); + }catch (Exception exception){ + LOGGER.error("",exception); + } + for (Map.Entry entry : integerMap.entrySet()) { + gaugeMetricFamily.addMetric(ImmutableList.of(Type.USE.getName(), entry.getKey()), + entry.getValue()); + } + } + return gaugeMetricFamily; + } + + private static GaugeMetricFamily countCurrentJdbcBackend() { + GaugeMetricFamily gaugeMetricFamily = new GaugeMetricFamily("jdbc_connection", + "jdbc backend connection", + ImmutableList.of("type", "datasource")); + + if(MetaClusterCurrent.exist(JdbcConnectionManager.class)){ + JdbcConnectionManager jdbcConnectionManager = MetaClusterCurrent.wrapper(JdbcConnectionManager.class); + for (JdbcDataSource jdbcDataSource : jdbcConnectionManager.getDatasourceInfo().values()) { + gaugeMetricFamily.addMetric(ImmutableList.of(Type.USE.getName(), jdbcDataSource.getName()), + jdbcDataSource.getUsedCount()); + } + } + return gaugeMetricFamily; + } + + private static GaugeMetricFamily countTotalBackend() { + GaugeMetricFamily gaugeMetricFamily = new GaugeMetricFamily("instance_connection", + "jdbc + native_mysql backend connection", + ImmutableList.of("type", "datasource")); + if(MetaClusterCurrent.exist(ReplicaSelectorManager.class)){ + ReplicaSelectorManager replicaSelectorManager = MetaClusterCurrent.wrapper(ReplicaSelectorManager.class); + for (PhysicsInstance physicsInstance : replicaSelectorManager.getPhysicsInstances()) { + gaugeMetricFamily.addMetric(ImmutableList.of(Type.TOTAL.getName(), physicsInstance.getName()), physicsInstance.getSessionCounter()); + } + } + return gaugeMetricFamily; + } +} \ No newline at end of file diff --git a/mycat2/src/main/java/io/mycat/exporter/HeartbeatCollector.java b/mycat2/src/main/java/io/mycat/exporter/HeartbeatCollector.java new file mode 100644 index 000000000..5c5a9c19d --- /dev/null +++ b/mycat2/src/main/java/io/mycat/exporter/HeartbeatCollector.java @@ -0,0 +1,45 @@ +package io.mycat.exporter; + +import com.google.common.collect.ImmutableList; +import io.mycat.api.collector.RowBaseIterator; +import io.mycat.sqlhandler.dql.HintHandler; +import io.prometheus.client.Collector; +import io.prometheus.client.GaugeMetricFamily; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Timestamp; +import java.util.*; +import java.util.stream.Collectors; + +public class HeartbeatCollector extends Collector { + private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatCollector.class); + + @Override + public List collect() { + try { + RowBaseIterator rowBaseIterator = HintHandler.showHeatbeatStat(); + List columnList = ImmutableList.of("NAME"); + List> resultSetMap = rowBaseIterator.getResultSetMap(); + GaugeMetricFamily gaugeMetricFamily = new GaugeMetricFamily("heartbeat_stat", + "heartbeat_stat", + columnList); + for (Map stringObjectMap : resultSetMap) { + Timestamp LAST_SEND_QUERY_TIME = Timestamp.valueOf((java.time.LocalDateTime) stringObjectMap.get("LAST_SEND_QUERY_TIME")); + Timestamp LAST_RECEIVED_QUERY_TIME = Timestamp.valueOf((java.time.LocalDateTime) stringObjectMap.get("LAST_RECEIVED_QUERY_TIME")); + //查询时间之差 + long l = LAST_RECEIVED_QUERY_TIME.getTime() - LAST_SEND_QUERY_TIME.getTime(); + List collect = columnList.stream().map(i -> stringObjectMap.get(i)) + .map(i -> Objects.toString(i)).collect(Collectors.toList()); + + gaugeMetricFamily.addMetric( + collect, + l); + } + return ImmutableList.of(gaugeMetricFamily); + } catch (Throwable e) { + LOGGER.error("", e); + throw e; + } + } +} \ No newline at end of file diff --git a/mycat2/src/main/java/io/mycat/exporter/InstanceCollector.java b/mycat2/src/main/java/io/mycat/exporter/InstanceCollector.java new file mode 100644 index 000000000..4f6d319da --- /dev/null +++ b/mycat2/src/main/java/io/mycat/exporter/InstanceCollector.java @@ -0,0 +1,33 @@ +package io.mycat.exporter; + +import com.google.common.collect.ImmutableList; +import io.mycat.MetaClusterCurrent; +import io.mycat.api.collector.RowBaseIterator; +import io.mycat.replica.PhysicsInstance; +import io.mycat.replica.ReplicaSelectorManager; +import io.prometheus.client.Collector; +import io.prometheus.client.GaugeMetricFamily; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class InstanceCollector extends Collector { + @Override + public List collect() { + List columnList = ImmutableList.of("NAME"); + GaugeMetricFamily gaugeMetricFamily = new GaugeMetricFamily("instance_active", + "instance_active", + columnList); + + if (MetaClusterCurrent.exist(ReplicaSelectorManager.class)) { + ReplicaSelectorManager replicaSelectorManager = MetaClusterCurrent.wrapper(ReplicaSelectorManager.class); + for (PhysicsInstance physicsInstance : replicaSelectorManager.getPhysicsInstances()) { + int value = physicsInstance.isAlive() ? 1 : 0; + gaugeMetricFamily.addMetric(columnList, value); + } + } + return ImmutableList.of(gaugeMetricFamily); + } +} \ No newline at end of file diff --git a/mycat2/src/main/java/io/mycat/exporter/PrometheusExporter.java b/mycat2/src/main/java/io/mycat/exporter/PrometheusExporter.java index 32d3a7626..a380fe6f9 100644 --- a/mycat2/src/main/java/io/mycat/exporter/PrometheusExporter.java +++ b/mycat2/src/main/java/io/mycat/exporter/PrometheusExporter.java @@ -16,7 +16,7 @@ public class PrometheusExporter implements Runnable { @Override public void run() { MycatServerConfig mycatServerConfig = MetaClusterCurrent.wrapper(MycatServerConfig.class); - Optional.ofNullable(mycatServerConfig.getProperties().getOrDefault("prometheusPort",null)) + Optional.ofNullable(mycatServerConfig.getProperties().getOrDefault("prometheusPort",7066)) .ifPresent(port->{ try { CollectorList collectorList = new CollectorList( @@ -30,7 +30,12 @@ public void run() { ////////////////////////////////////////// new SqlStatCollector(), new ReplicaCollector(), - new CPULoadCollector() + new CPULoadCollector(), + ///////////////////////////////////////// + new ConnectionCounterCollector(), + new HeartbeatCollector(), + new InstanceCollector(), + new ThreadPoolCollector() ); collectorList.register(); LOGGER.info("PrometheusExporter start server port:"+port); diff --git a/mycat2/src/main/java/io/mycat/exporter/ThreadPoolCollector.java b/mycat2/src/main/java/io/mycat/exporter/ThreadPoolCollector.java new file mode 100644 index 000000000..de4ea790a --- /dev/null +++ b/mycat2/src/main/java/io/mycat/exporter/ThreadPoolCollector.java @@ -0,0 +1,35 @@ +package io.mycat.exporter; + +import com.google.common.collect.ImmutableList; +import io.mycat.IOExecutor; +import io.mycat.MetaClusterCurrent; +import io.prometheus.client.Collector; +import io.prometheus.client.GaugeMetricFamily; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ThreadPoolCollector extends Collector { + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolCollector.class); + + @Override + public List collect() { + try { + List columnList = ImmutableList.of("ACTIVE_COUNT"); + GaugeMetricFamily gaugeMetricFamily = new GaugeMetricFamily("thread_pool_active", + "thread_pool_active", + columnList); + long count = 0; + if(MetaClusterCurrent.exist(IOExecutor.class)){ + IOExecutor ioExecutor = MetaClusterCurrent.wrapper(IOExecutor.class); + count = ioExecutor.count(); + } + gaugeMetricFamily.addMetric(columnList, count); + return ImmutableList.of(gaugeMetricFamily); + } catch (Throwable e) { + LOGGER.error("", e); + throw e; + } + } +} \ No newline at end of file diff --git a/mycat2/src/main/java/io/mycat/sqlhandler/dql/HintHandler.java b/mycat2/src/main/java/io/mycat/sqlhandler/dql/HintHandler.java index f59bda577..1fb3a7dde 100644 --- a/mycat2/src/main/java/io/mycat/sqlhandler/dql/HintHandler.java +++ b/mycat2/src/main/java/io/mycat/sqlhandler/dql/HintHandler.java @@ -88,7 +88,7 @@ protected Future onExecute(SQLRequest request, MycatDa try { if (hints.size() == 1) { String s = SqlHints.unWrapperHint(hints.get(0).getText()); - if (s.startsWith("mycat:")||s.startsWith("MYCAT:")) { + if (s.startsWith("mycat:") || s.startsWith("MYCAT:")) { s = s.substring(6); int bodyStartIndex = s.indexOf('{'); String cmd; @@ -308,94 +308,8 @@ protected Future onExecute(SQLRequest request, MycatDa return response.sendResultSet(() -> resultSetBuilder.build()); } if ("showHeartbeats".equalsIgnoreCase(cmd)) { - Map dataSourceConfig = routerConfig.getDatasources().stream().collect(Collectors.toMap(k -> k.getName(), v -> v)); - - ResultSetBuilder resultSetBuilder = ResultSetBuilder.create(); - - resultSetBuilder.addColumnInfo("NAME", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("TYPE", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("READABLE", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("SESSION_COUNT", JDBCType.BIGINT); - resultSetBuilder.addColumnInfo("WEIGHT", JDBCType.BIGINT); - resultSetBuilder.addColumnInfo("ALIVE", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("MASTER", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("LIMIT_SESSION_COUNT", JDBCType.BIGINT); - resultSetBuilder.addColumnInfo("REPLICA", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("SLAVE_THRESHOLD", JDBCType.BIGINT); - resultSetBuilder.addColumnInfo("IS_HEARTBEAT_TIMEOUT", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("HB_ERROR_COUNT", JDBCType.BIGINT); - resultSetBuilder.addColumnInfo("HB_LAST_SWITCH_TIME", JDBCType.TIMESTAMP); - resultSetBuilder.addColumnInfo("HB_MAX_RETRY", JDBCType.BIGINT); - resultSetBuilder.addColumnInfo("IS_CHECKING", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("MIN_SWITCH_TIME_INTERVAL", JDBCType.BIGINT); - resultSetBuilder.addColumnInfo("HEARTBEAT_TIMEOUT", JDBCType.BIGINT); - resultSetBuilder.addColumnInfo("SYNC_DS_STATUS", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("HB_DS_STATUS", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("IS_SLAVE_BEHIND_MASTER", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("LAST_SEND_QUERY_TIME", JDBCType.TIMESTAMP); - resultSetBuilder.addColumnInfo("LAST_RECEIVED_QUERY_TIME", JDBCType.TIMESTAMP); - - - for (HeartbeatFlow heartbeatFlow : replicaSelectorRuntime.getHeartbeatDetectorMap().values()) { - PhysicsInstance instance = heartbeatFlow.instance(); - - String NAME = instance.getName(); - String TYPE = instance.getType().name(); - boolean READABLE = instance.asSelectRead(); - int SESSION_COUNT = instance.getSessionCounter(); - int WEIGHT = instance.getWeight(); - boolean ALIVE = instance.isAlive(); - boolean MASTER = instance.isMaster(); - - double SLAVE_THRESHOLD = heartbeatFlow.getSlaveThreshold(); - - boolean IS_HEARTBEAT_TIMEOUT = heartbeatFlow.isHeartbeatTimeout(); - final HeartBeatStatus HEART_BEAT_STATUS = heartbeatFlow.getHbStatus(); - int HB_ERROR_COUNT = HEART_BEAT_STATUS.getErrorCount(); - LocalDateTime HB_LAST_SWITCH_TIME = - new Timestamp(HEART_BEAT_STATUS.getLastSwitchTime()).toLocalDateTime(); - int HB_MAX_RETRY = HEART_BEAT_STATUS.getMaxRetry(); - boolean IS_CHECKING = HEART_BEAT_STATUS.isChecking(); - long MIN_SWITCH_TIME_INTERVAL = HEART_BEAT_STATUS.getMinSwitchTimeInterval(); - final long HEARTBEAT_TIMEOUT = (heartbeatFlow.getHeartbeatTimeout()); - DatasourceStatus DS_STATUS_OBJECT = heartbeatFlow.getDsStatus(); - String SYNC_DS_STATUS = DS_STATUS_OBJECT.getDbSynStatus().name(); - String HB_DS_STATUS = DS_STATUS_OBJECT.getStatus().name(); - boolean IS_SLAVE_BEHIND_MASTER = DS_STATUS_OBJECT.isSlaveBehindMaster(); - LocalDateTime LAST_SEND_QUERY_TIME = - new Timestamp(heartbeatFlow.getLastSendQryTime()).toLocalDateTime(); - LocalDateTime LAST_RECEIVED_QUERY_TIME = - new Timestamp(heartbeatFlow.getLastReceivedQryTime()).toLocalDateTime(); - Optional e = Optional.ofNullable(dataSourceConfig.get(NAME)); - - String replicaDataSourceSelectorList = String.join(",", replicaSelectorRuntime.getReplicaNameListByInstanceName(NAME)); - - resultSetBuilder.addObjectRowPayload( - Arrays.asList(NAME, - TYPE, - READABLE, - SESSION_COUNT, - WEIGHT, - ALIVE, - MASTER, - e.map(i -> i.getMaxCon()).orElse(-1), - replicaDataSourceSelectorList, - SLAVE_THRESHOLD, - IS_HEARTBEAT_TIMEOUT, - HB_ERROR_COUNT, - HB_LAST_SWITCH_TIME, - HB_MAX_RETRY, - IS_CHECKING, - MIN_SWITCH_TIME_INTERVAL, - HEARTBEAT_TIMEOUT, - SYNC_DS_STATUS, - HB_DS_STATUS, - IS_SLAVE_BEHIND_MASTER, - LAST_SEND_QUERY_TIME, - LAST_RECEIVED_QUERY_TIME - )); - } - return response.sendResultSet(resultSetBuilder.build()); + RowBaseIterator rowBaseIterator = showHeatbeatStat(); + return response.sendResultSet(rowBaseIterator); } if ("showHeartbeatStatus".equalsIgnoreCase(cmd)) { ResultSetBuilder builder = ResultSetBuilder.create(); @@ -413,45 +327,8 @@ protected Future onExecute(SQLRequest request, MycatDa return response.sendResultSet(() -> builder.build()); } if ("showInstances".equalsIgnoreCase(cmd)) { - ResultSetBuilder resultSetBuilder = ResultSetBuilder.create(); - resultSetBuilder.addColumnInfo("NAME", JDBCType.VARCHAR); - - resultSetBuilder.addColumnInfo("ALIVE", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("READABLE", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("TYPE", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("SESSION_COUNT", JDBCType.BIGINT); - resultSetBuilder.addColumnInfo("WEIGHT", JDBCType.BIGINT); - - resultSetBuilder.addColumnInfo("MASTER", JDBCType.VARCHAR); - resultSetBuilder.addColumnInfo("LIMIT_SESSION_COUNT", JDBCType.BIGINT); - resultSetBuilder.addColumnInfo("REPLICA", JDBCType.VARCHAR); - Collection values = - replicaSelectorRuntime.getPhysicsInstances(); - Map dataSourceConfig = routerConfig.getDatasources().stream().collect(Collectors.toMap(k -> k.getName(), v -> v)); - - - for (PhysicsInstance instance : values) { - - String NAME = instance.getName(); - String TYPE = instance.getType().name(); - boolean READABLE = instance.asSelectRead(); - int SESSION_COUNT = instance.getSessionCounter(); - int WEIGHT = instance.getWeight(); - boolean ALIVE = instance.isAlive(); - boolean MASTER = instance.isMaster(); - - Optional e = Optional.ofNullable(dataSourceConfig.get(NAME)); - - - String replicaDataSourceSelectorList = String.join(",", replicaSelectorRuntime.getReplicaNameListByInstanceName(NAME)); - - resultSetBuilder.addObjectRowPayload( - Arrays.asList(NAME, ALIVE, READABLE, TYPE, SESSION_COUNT, WEIGHT, MASTER, - e.map(i -> i.getMaxCon()).orElse(-1), - replicaDataSourceSelectorList - )); - } - return response.sendResultSet(resultSetBuilder.build()); + RowBaseIterator rowBaseIterator = showInstances(); + return response.sendResultSet(rowBaseIterator); } if ("showReactors".equalsIgnoreCase(cmd)) { MycatServer server = MetaClusterCurrent.wrapper(MycatServer.class); @@ -638,6 +515,147 @@ protected Future onExecute(SQLRequest request, MycatDa } } + public static RowBaseIterator showHeatbeatStat() { + ResultSetBuilder resultSetBuilder = ResultSetBuilder.create(); + resultSetBuilder.addColumnInfo("NAME", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("TYPE", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("READABLE", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("SESSION_COUNT", JDBCType.BIGINT); + resultSetBuilder.addColumnInfo("WEIGHT", JDBCType.BIGINT); + resultSetBuilder.addColumnInfo("ALIVE", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("MASTER", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("LIMIT_SESSION_COUNT", JDBCType.BIGINT); + resultSetBuilder.addColumnInfo("REPLICA", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("SLAVE_THRESHOLD", JDBCType.BIGINT); + resultSetBuilder.addColumnInfo("IS_HEARTBEAT_TIMEOUT", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("HB_ERROR_COUNT", JDBCType.BIGINT); + resultSetBuilder.addColumnInfo("HB_LAST_SWITCH_TIME", JDBCType.TIMESTAMP); + resultSetBuilder.addColumnInfo("HB_MAX_RETRY", JDBCType.BIGINT); + resultSetBuilder.addColumnInfo("IS_CHECKING", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("MIN_SWITCH_TIME_INTERVAL", JDBCType.BIGINT); + resultSetBuilder.addColumnInfo("HEARTBEAT_TIMEOUT", JDBCType.BIGINT); + resultSetBuilder.addColumnInfo("SYNC_DS_STATUS", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("HB_DS_STATUS", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("IS_SLAVE_BEHIND_MASTER", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("LAST_SEND_QUERY_TIME", JDBCType.TIMESTAMP); + resultSetBuilder.addColumnInfo("LAST_RECEIVED_QUERY_TIME", JDBCType.TIMESTAMP); + + + if (MetaClusterCurrent.exist(MycatRouterConfig.class) && MetaClusterCurrent.exist(ReplicaSelectorManager.class)) { + MycatRouterConfig routerConfig = MetaClusterCurrent.wrapper(MycatRouterConfig.class); + ReplicaSelectorManager replicaSelectorRuntime = MetaClusterCurrent.wrapper(ReplicaSelectorManager.class); + Map dataSourceConfig = routerConfig.getDatasources().stream().collect(Collectors.toMap(k -> k.getName(), v -> v)); + + for (HeartbeatFlow heartbeatFlow : replicaSelectorRuntime.getHeartbeatDetectorMap().values()) { + PhysicsInstance instance = heartbeatFlow.instance(); + + String NAME = instance.getName(); + String TYPE = instance.getType().name(); + boolean READABLE = instance.asSelectRead(); + int SESSION_COUNT = instance.getSessionCounter(); + int WEIGHT = instance.getWeight(); + boolean ALIVE = instance.isAlive(); + boolean MASTER = instance.isMaster(); + + double SLAVE_THRESHOLD = heartbeatFlow.getSlaveThreshold(); + + boolean IS_HEARTBEAT_TIMEOUT = heartbeatFlow.isHeartbeatTimeout(); + final HeartBeatStatus HEART_BEAT_STATUS = heartbeatFlow.getHbStatus(); + int HB_ERROR_COUNT = HEART_BEAT_STATUS.getErrorCount(); + LocalDateTime HB_LAST_SWITCH_TIME = + new Timestamp(HEART_BEAT_STATUS.getLastSwitchTime()).toLocalDateTime(); + int HB_MAX_RETRY = HEART_BEAT_STATUS.getMaxRetry(); + boolean IS_CHECKING = HEART_BEAT_STATUS.isChecking(); + long MIN_SWITCH_TIME_INTERVAL = HEART_BEAT_STATUS.getMinSwitchTimeInterval(); + final long HEARTBEAT_TIMEOUT = (heartbeatFlow.getHeartbeatTimeout()); + DatasourceStatus DS_STATUS_OBJECT = heartbeatFlow.getDsStatus(); + String SYNC_DS_STATUS = DS_STATUS_OBJECT.getDbSynStatus().name(); + String HB_DS_STATUS = DS_STATUS_OBJECT.getStatus().name(); + boolean IS_SLAVE_BEHIND_MASTER = DS_STATUS_OBJECT.isSlaveBehindMaster(); + LocalDateTime LAST_SEND_QUERY_TIME = + new Timestamp(heartbeatFlow.getLastSendQryTime()).toLocalDateTime(); + LocalDateTime LAST_RECEIVED_QUERY_TIME = + new Timestamp(heartbeatFlow.getLastReceivedQryTime()).toLocalDateTime(); + Optional e = Optional.ofNullable(dataSourceConfig.get(NAME)); + + String replicaDataSourceSelectorList = String.join(",", replicaSelectorRuntime.getReplicaNameListByInstanceName(NAME)); + + resultSetBuilder.addObjectRowPayload( + Arrays.asList(NAME, + TYPE, + READABLE, + SESSION_COUNT, + WEIGHT, + ALIVE, + MASTER, + e.map(i -> i.getMaxCon()).orElse(-1), + replicaDataSourceSelectorList, + SLAVE_THRESHOLD, + IS_HEARTBEAT_TIMEOUT, + HB_ERROR_COUNT, + HB_LAST_SWITCH_TIME, + HB_MAX_RETRY, + IS_CHECKING, + MIN_SWITCH_TIME_INTERVAL, + HEARTBEAT_TIMEOUT, + SYNC_DS_STATUS, + HB_DS_STATUS, + IS_SLAVE_BEHIND_MASTER, + LAST_SEND_QUERY_TIME, + LAST_RECEIVED_QUERY_TIME + )); + } + } + RowBaseIterator rowBaseIterator = resultSetBuilder.build(); + return rowBaseIterator; + } + + public static RowBaseIterator showInstances() { + MycatRouterConfig routerConfig = MetaClusterCurrent.wrapper(MycatRouterConfig.class); + ReplicaSelectorManager replicaSelectorRuntime = MetaClusterCurrent.wrapper(ReplicaSelectorManager.class); + + ResultSetBuilder resultSetBuilder = ResultSetBuilder.create(); + resultSetBuilder.addColumnInfo("NAME", JDBCType.VARCHAR); + + resultSetBuilder.addColumnInfo("ALIVE", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("READABLE", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("TYPE", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("SESSION_COUNT", JDBCType.BIGINT); + resultSetBuilder.addColumnInfo("WEIGHT", JDBCType.BIGINT); + + resultSetBuilder.addColumnInfo("MASTER", JDBCType.VARCHAR); + resultSetBuilder.addColumnInfo("LIMIT_SESSION_COUNT", JDBCType.BIGINT); + resultSetBuilder.addColumnInfo("REPLICA", JDBCType.VARCHAR); + Collection values = + replicaSelectorRuntime.getPhysicsInstances(); + Map dataSourceConfig = routerConfig.getDatasources().stream().collect(Collectors.toMap(k -> k.getName(), v -> v)); + + + for (PhysicsInstance instance : values) { + + String NAME = instance.getName(); + String TYPE = instance.getType().name(); + boolean READABLE = instance.asSelectRead(); + int SESSION_COUNT = instance.getSessionCounter(); + int WEIGHT = instance.getWeight(); + boolean ALIVE = instance.isAlive(); + boolean MASTER = instance.isMaster(); + + Optional e = Optional.ofNullable(dataSourceConfig.get(NAME)); + + + String replicaDataSourceSelectorList = String.join(",", replicaSelectorRuntime.getReplicaNameListByInstanceName(NAME)); + + resultSetBuilder.addObjectRowPayload( + Arrays.asList(NAME, ALIVE, READABLE, TYPE, SESSION_COUNT, WEIGHT, MASTER, + e.map(i -> i.getMaxCon()).orElse(-1), + replicaDataSourceSelectorList + )); + } + RowBaseIterator rowBaseIterator = resultSetBuilder.build(); + return rowBaseIterator; + } + @Nullable private Future showTables(Response response, String body, MetadataManager metadataManager, MycatRouterConfig routerConfig) { Map map = JsonUtil.from(body, Map.class);