Skip to content

Commit

Permalink
Merge pull request #106 from wolfboys/master
Browse files Browse the repository at this point in the history
1.0.0 beta 1
  • Loading branch information
wolfboys authored Apr 18, 2021
2 parents 2606b7c + 84630d2 commit 180a531
Show file tree
Hide file tree
Showing 33 changed files with 427 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ object ConfigConst {

def KEY_FLINK_HOME(prefix: String = null): String = if (prefix == null) "flink.home" else s"${prefix}flink.home"

def KEY_FLINK_CONF(prefix: String = null): String = if (prefix == null) "flink.conf" else s"${prefix}flink.conf"

def KEY_APP_NAME(prefix: String = null): String = if (prefix == null) "app.name" else s"${prefix}app.name"

def KEY_FLINK_SQL(prefix: String = null): String = if (prefix == null) "sql" else s"${prefix}sql"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CREATE TABLE `t_app_backup` (
`APP_ID` bigint(20) DEFAULT NULL,
`SQL_ID` bigint(20) DEFAULT NULL,
`CONFIG_ID` bigint(20) DEFAULT NULL,
`VERSION` int(10) DEFAULT NULL,
`PATH` varchar(255) DEFAULT NULL,
`DESCRIPTION` varchar(255) DEFAULT NULL,
`CREATE_TIME` datetime DEFAULT NULL,
Expand All @@ -36,7 +37,7 @@ CREATE TABLE `t_flame_graph` (
PRIMARY KEY (`ID`) USING BTREE,
KEY `INX_TIME` (`TIMELINE`) USING BTREE,
KEY `INX_APPID` (`APP_ID`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1379910128636960771 DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------
-- Records of t_flame_graph
Expand Down Expand Up @@ -84,13 +85,13 @@ CREATE TABLE `t_flink_app` (
PRIMARY KEY (`ID`) USING BTREE,
KEY `INX_STATE` (`STATE`) USING BTREE,
KEY `INX_TRACK` (`TRACKING`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1381548268640436226 DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB AUTO_INCREMENT=1383720958407139331 DEFAULT CHARSET=utf8mb4;

-- ----------------------------
-- Records of t_flink_app
-- ----------------------------
BEGIN;
INSERT INTO `t_flink_app` VALUES (1381548268640436225, 2, 4, NULL, 'Flink Sql Test', NULL, NULL, NULL, NULL, '{\"jobmanager.memory.process.size\":\"1024mb\",\"taskmanager.memory.process.size\":\"1024mb\",\"parallelism.default\":1,\"taskmanager.numberOfTaskSlots\":1}', 1, NULL, 1, NULL, NULL, '2', NULL, NULL, 0, 0, NULL, NULL, NULL, NULL, NULL, NULL, 0, 0, '2021-04-12 18:02:30', 0, NULL, NULL);
INSERT INTO `t_flink_app` VALUES (1383720958407139330, 2, 4, NULL, 'Flink Sql', NULL, NULL, NULL, '', '{\"jobmanager.memory.process.size\":\"1024mb\",\"taskmanager.memory.process.size\":\"1024mb\",\"parallelism.default\":1,\"taskmanager.numberOfTaskSlots\":1}', 1, 'application_1608425563635_146822', 1, 6609957, 'fee72e948e804b8e2aeec20db6a19183', '7', '', 'Flink Sql', 0, 0, 1024, 1024, 1, 1, 1, 0, 0, 1, '2021-04-18 17:55:59', 3, '2021-04-18 18:31:56', NULL);
COMMIT;

-- ----------------------------
Expand All @@ -106,7 +107,7 @@ CREATE TABLE `t_flink_config` (
`CONTENT` text NOT NULL,
`CREATE_TIME` datetime DEFAULT NULL,
PRIMARY KEY (`ID`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB AUTO_INCREMENT=1383318218421993474 DEFAULT CHARSET=utf8mb4;

-- ----------------------------
-- Records of t_flink_config
Expand All @@ -126,13 +127,12 @@ CREATE TABLE `t_flink_effective` (
`CREATE_TIME` datetime DEFAULT NULL,
PRIMARY KEY (`ID`) USING BTREE,
UNIQUE KEY `UN_INX` (`APP_ID`,`TARGET_TYPE`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1381548275443597314 DEFAULT CHARSET=utf8mb4;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ----------------------------
-- Records of t_flink_effective
-- ----------------------------
BEGIN;
INSERT INTO `t_flink_effective` VALUES (1381548275443597313, 1381548268640436225, 2, 1381548268724322305, NULL);
COMMIT;

-- ----------------------------
Expand Down Expand Up @@ -180,7 +180,7 @@ CREATE TABLE `t_flink_project` (
-- Records of t_flink_project
-- ----------------------------
BEGIN;
INSERT INTO `t_flink_project` VALUES (1, 'streamx-quickstart', 'https://gitee.com/benjobs/streamx-quickstart.git', 'main', NULL, NULL, NULL, 1, 1, '2021-04-08 05:01:02', '2021-04-12 17:59:36', 'streamx-quickstart', 1);
INSERT INTO `t_flink_project` VALUES (1, 'streamx-quickstart', 'https://gitee.com/benjobs/streamx-quickstart.git', 'main', NULL, NULL, NULL, 1, 1, '2021-04-08 05:01:02', '2021-04-17 11:47:39', 'streamx-quickstart', 1);
COMMIT;

-- ----------------------------
Expand All @@ -194,7 +194,7 @@ CREATE TABLE `t_flink_savepoint` (
`SAVE_POINT` varchar(255) DEFAULT NULL,
`CREATE_TIME` datetime DEFAULT NULL,
PRIMARY KEY (`ID`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1379910126837604354 DEFAULT CHARSET=utf8;
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of t_flink_savepoint
Expand All @@ -212,7 +212,7 @@ CREATE TABLE `t_flink_sql` (
`SQL` text,
`DEPENDENCY` text,
`VERSION` int(20) DEFAULT NULL,
`LATEST` tinyint(1) NOT NULL DEFAULT '0',
`CANDIDATE` int(1) NOT NULL DEFAULT '0',
`CREATE_TIME` datetime DEFAULT NULL,
PRIMARY KEY (`ID`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Expand All @@ -221,7 +221,6 @@ CREATE TABLE `t_flink_sql` (
-- Records of t_flink_sql
-- ----------------------------
BEGIN;
INSERT INTO `t_flink_sql` VALUES (1381548268724322305, 1381548268640436225, 'eNqNVM9PGkEUvu9fMbeFhsUF+kuMBwQUEgEDa5uezMAusrow25lZLLeaGGNbrR5o05q20dRGDw320miBpv8Mu9BT/4XO7ig/atN0b+973/fezPvebDyfjClJoMTmFpPAIhpeMdAq8AmAfV6oq+BBLB9PxfIBD9SpVr0BliDVVhFu3EgUtQqs6whPopQAJZ1JFpRYZskX8Qt+8DCtpFhXsYRqNa1EEQ7ShqmJYBaI67C8DsUAkCTQ+/6j3zwDHgKG1HFVXcNERzVPaNV0N4QGE7tqLus/23Heff7V3R2mgdM8d3Y3gRwMhUCv86l3+bx/uMV5EydCpl7ild3BXF9torrHGReZGJkaprpGgkWEKKEYmkEmd1uLsyLVCJUqUEXIlO5Fp+XpcGAcun8TmvYgMTDehFCIqWVKVaTymWkQG7orQuUy0ejV9DovB18v7NMXgKPA7j5l0eC8Y++/FkTLVJmLoxrQNLWa6jYqI1yFdOTIGmET5rd2Xn1x9lpO+8A56trd/d5lG7jZoUbVMJuyREoVrQo9McUWq8KPAxKJRcBzoH/csluHnhoMTj86Hw4Gp1v2zlvBPyMI8fEtNetWfYXotfWrNVXp5HaZdTCXXkhnFR5a1+G/12xNLZYmtswF/r5kFjaGkmi1QR4b0akpA5WgUUGERiMR+e6Uaxkv59VhkomusGjwtsPLcPLg+Mw+2JtoxpalBqucjdkKcSI7obNz8QfXhIRsIKx63FA4EgmHw7fDXGCfb/ePNsfZG5i95WDZsEglWIVPJIw2CFd6/vzsvBm0Tu7Isuy8P2bvhXnba2873Wb/pO00v7EwxBKePelsIZlXAJtxbuSOUEguJuMK8yDBvFuZz+UzMcVHSQCIDfZJmYykqiCVisqy6Gcmum7Fc8tZxXfLD2IFVmiEJNIFJZ2NK9e/JI9g1YX5fC4z/GsJC/nc8hKYe/Q/DWeE30u2w2Q=', '{\"pom\":[{\"groupId\":\"mysql\",\"artifactId\":\"mysql-connector-java\",\"version\":\"5.1.48\",\"exclusions\":[]},{\"groupId\":\"org.apache.flink\",\"artifactId\":\"flink-sql-connector-kafka_2.12\",\"version\":\"1.12.0\",\"exclusions\":[]},{\"groupId\":\"org.apache.flink\",\"artifactId\":\"flink-connector-jdbc_2.11\",\"version\":\"1.12.0\",\"exclusions\":[]},{\"groupId\":\"org.apache.flink\",\"artifactId\":\"flink-json\",\"version\":\"1.12.0\",\"exclusions\":[]}]}', 1, 0, '2021-04-12 18:02:30');
COMMIT;

-- ----------------------------
Expand Down Expand Up @@ -423,8 +422,9 @@ CREATE TABLE `t_setting` (
-- Records of t_setting
-- ----------------------------
BEGIN;
INSERT INTO `t_setting` VALUES ('env.flink.home', NULL, 'Flink Home', 'Flink Home');
INSERT INTO `t_setting` VALUES ('maven.central.repository', '', 'Maven Central Repository', 'Maven 私服地址');
INSERT INTO `t_setting` VALUES ('streamx.console.webapp.address', 'http://test-hadoop-2:10001', 'StreamX Webapp address', 'StreamX Console Web 应用程序 HTTP 端口');
INSERT INTO `t_setting` VALUES ('streamx.console.webapp.address', 'http://test-hadoop-2:10000', 'StreamX Webapp address', 'StreamX Console Web 应用程序 HTTP 端口');
INSERT INTO `t_setting` VALUES ('streamx.console.workspace', '/streamx/workspace', 'StreamX Console Workspace', 'StreamX Console 的工作空间,用于存放项目源码,编译后的项目等');
COMMIT;

Expand Down Expand Up @@ -457,7 +457,7 @@ CREATE TABLE `t_user` (
-- Records of t_user
-- ----------------------------
BEGIN;
INSERT INTO `t_user` VALUES (1, 'admin', '', 'ats6sdxdqf8vsqjtz0utj461wr', '829b009a6b9cc8ea486a4abbc38e56529f3c6f4c9c6fcd3604b41b1d6eca1a57', 1, '[email protected]', '18500193260', '1', '2017-12-27 15:47:19', '2019-08-09 15:42:57', '2021-04-12 18:06:31', '0', 'author。', 'ubnKSIfAJTxIgXOKlciN.png', '1');
INSERT INTO `t_user` VALUES (1, 'admin', '', 'ats6sdxdqf8vsqjtz0utj461wr', '829b009a6b9cc8ea486a4abbc38e56529f3c6f4c9c6fcd3604b41b1d6eca1a57', 1, '[email protected]', '18500193260', '1', '2017-12-27 15:47:19', '2019-08-09 15:42:57', '2021-04-18 09:57:19', '0', 'author。', 'ubnKSIfAJTxIgXOKlciN.png', '1');
COMMIT;

-- ----------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ public RestResponse mapping(Application app) {
public RestResponse deploy(Application app) {
Application application = applicationService.getById(app.getId());
assert application != null;

application.setRestart(app.getRestart());
application.setSavePointed(app.getSavePointed());
application.setAllowNonRestored(app.getAllowNonRestored());
application.setBackUp(true);
application.setBackUpDescription(app.getBackUpDescription());
applicationService.deploy(application);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.streamxhub.streamx.console.core.entity.ApplicationBackUp;
import com.streamxhub.streamx.console.core.service.ApplicationBackUpService;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
Expand All @@ -42,4 +41,8 @@ public interface ApplicationBackUpMapper extends BaseMapper<ApplicationBackUp> {

@Delete("delete from t_app_backup where app_id=#{appId}")
void removeApp(@Param("appId")Long appId);

@Select("SELECT * from t_app_backup where app_id=#{appId} and sql_id=#{sqlId}")
ApplicationBackUp getFlinkSqlBackup(@Param("appId")Long appId,@Param("sqlId") Long sqlId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ public interface FlinkSqlMapper extends BaseMapper<FlinkSql> {

@Delete("delete from t_flink_sql where app_id=#{appId}")
void removeApp(@Param("appId")Long appId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@
import com.streamxhub.streamx.common.util.HttpClientUtils;
import com.streamxhub.streamx.console.base.utils.SpringContextUtil;
import com.streamxhub.streamx.console.core.enums.ApplicationType;
import com.streamxhub.streamx.console.core.enums.ChangedType;
import com.streamxhub.streamx.console.core.enums.DeployState;
import com.streamxhub.streamx.console.core.enums.FlinkAppState;
import com.streamxhub.streamx.console.core.metrics.flink.JobsOverview;
import com.streamxhub.streamx.console.core.metrics.flink.Overview;
import com.streamxhub.streamx.console.core.metrics.yarn.AppInfo;
import com.streamxhub.streamx.console.core.service.SettingService;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -146,17 +150,18 @@ public class Application implements Serializable {
private transient String dependency;
private transient Long sqlId;
private transient String flinkSql;
private transient Boolean backUp;
private transient Boolean restart;

private transient Boolean backUp = false;
private transient Boolean restart = false;
private transient String userName;
private transient String config;
private transient Long configId;
private transient String confPath;
private transient Integer format;
private transient String savePoint;
private transient Boolean savePointed;
private transient Boolean drain;
private transient Boolean allowNonRestored;
private transient Boolean savePointed = false;
private transient Boolean drain = false;
private transient Boolean allowNonRestored = false;
private transient String projectName;
private transient String createTimeFrom;
private transient String createTimeTo;
Expand Down Expand Up @@ -304,6 +309,11 @@ public boolean isRunning() {
return FlinkAppState.RUNNING.getValue() == this.getState();
}

@JsonIgnore
public boolean isNeedRollback() {
return DeployState.NEED_ROLLBACK.get() == this.getDeploy();
}

@Data
public static class Dependency {
private List<Pom> pom = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@
public class ApplicationBackUp {
private Long id;
private Long appId;
/**
* 当 jobType为2时该值为FlinkSQL的id
*/
private Long sqlId;
private Long configId;
private String path;
private String description;
/**
* 备份时的版本号.
*/
private Integer version;
private Date createTime;

private transient boolean backup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class ApplicationConfig {
private Long appId;

/**
* 1)yaml 2)prop
* 1)yaml <br>
* 2)prop
*/
private Integer format;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.streamxhub.streamx.common.util.DeflaterUtils;
import com.streamxhub.streamx.console.core.enums.ChangedType;
import lombok.Data;
import net.minidev.json.annotate.JsonIgnore;

Expand All @@ -45,9 +46,9 @@ public class FlinkSql {

/**
* 候选版本:
* 0: 非候选
* 1: 新增的记录成为候选版本
* 2: 指定历史记录的版本成为候选版本
* 0: 非候选 <br>
* 1: 新增的记录成为候选版本 <br>
* 2: 指定历史记录的版本成为候选版本 <br>
*/
private Integer candidate;

Expand Down Expand Up @@ -85,6 +86,25 @@ public void setToApplication(Application application) {
application.setSqlId(this.id);
}

public ChangedType checkChange(FlinkSql target) {
// 1) 判断sql语句是否发生变化
boolean sqlDifference = !this.getSql().trim().equals(target.getSql().trim());
// 2) 判断 依赖是否发生变化
Application.Dependency thisDependency = Application.Dependency.jsonToDependency(this.getDependency());
Application.Dependency targetDependency = Application.Dependency.jsonToDependency(target.getDependency());
boolean depDifference = !thisDependency.eq(targetDependency);
if (sqlDifference && depDifference) {
return ChangedType.ALL;
}
if (sqlDifference) {
return ChangedType.SQL;
}
if (depDifference) {
return ChangedType.DEPENDENCY;
}
return ChangedType.NONE;
}

public void base64Encode() {
this.sql = Base64.getEncoder().encodeToString(DeflaterUtils.unzipString(this.sql).getBytes());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2019 The StreamX Project
* <p>
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.streamxhub.streamx.console.core.enums;

import java.util.Arrays;

/**
* @author benjobs
*/

public enum ChangedType {
/**
* 未发生变化
*/
NONE(0),

/**
* 依赖发生变化
*/
DEPENDENCY(1),

/**
* flink Sql发生变化
*/
SQL(2),

/**
* 依赖和sql都发生变化,BOTH
*/
ALL(3);


int value;

ChangedType(int value) {
this.value = value;
}

public int get() {
return this.value;
}

public static ChangedType of(Integer value) {
return Arrays.stream(values()).filter((x) -> x.value == value).findFirst().orElse(null);
}

public boolean noChanged() {
return this.equals(NONE);
}

public boolean hasChanged() {
return !noChanged();
}

public boolean isSqlChanged() {
return this.equals(SQL);
}

public boolean isDependencyChanged() {
return this.equals(DEPENDENCY);
}

public boolean isAllChanged() {
return this.equals(ALL);
}

@Override
public String toString() {
switch (this) {
case NONE:
return "[NONE], nothing to changed";
case DEPENDENCY:
return "[DEPENDENCY], Dependency is changed";
case SQL:
return "[SQL], Flink Sql is changed";
case ALL:
return "[ALL], Dependency and Flink Sql all changed";
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ public enum DeployState {
*/
NEED_RESTART_AFTER_DEPLOY(6),

//需要回滚
NEED_ROLLBACK(7),

/**
* 回滚完成,需要重新启动
*/
NEED_RESTART_AFTER_ROLLBACK(7),
NEED_RESTART_AFTER_ROLLBACK(8),

/**
* 发布的任务已经撤销
Expand Down
Loading

0 comments on commit 180a531

Please sign in to comment.