Skip to content

Commit

Permalink
make flink job can be deploy in kubernete-session environment
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Jan 6, 2024
1 parent 1ee3fbd commit 9f6b8ae
Show file tree
Hide file tree
Showing 22 changed files with 4,350 additions and 3,123 deletions.
59 changes: 35 additions & 24 deletions .idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"build:watch": "tsc -p ./src/ -w",
"build:e2e": "tsc -p e2e/",
"ng:serve-aot": "npm run build-project:sdk && npm run build-project:vis && ng build --aot ---configuration production ",
"ng:serve-jit": "export NODE_OPTIONS=--openssl-legacy-provider && ng serve --proxy-config proxy.conf.json",
"ng:serve-jit-arm64": "export NODE_OPTIONS=--openssl-legacy-provider && ng serve --proxy-config proxy.conf.json",
"ng:serve-jit-amd64": " ng serve --proxy-config proxy.conf.json",
"build:aot": "ngc -p tsconfig-aot.json && rollup -c rollup-config.js_bak",
"rollup": "rollup -c rollup-config.js_bak",
"serve": "lite-server -c=bs-config.json",
Expand Down
90 changes: 85 additions & 5 deletions src/base/base.manage-routing.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,115 @@ import {BaseConfigComponent} from "./base-config.component";
import {SnapshotsetComponent} from "../common/snapshotset.component";
import {SchemaEditVisualizingModelComponent, SchemaXmlEditComponent} from "../corecfg/schema-xml-edit.component";
import {DataxAddComponent} from "./datax.add.component";
import {DataxWorkerComponent} from "./datax.worker.component";
import {ProcessMeta} from "../runtime/misc/RCDeployment";
import {DataxWorkerComponent, PowerjobCptType} from "./datax.worker.component";
import {DataxWorkerDTO, ProcessMeta} from "../runtime/misc/RCDeployment";
import {PluginManageComponent} from "./plugin.manage.component";
import {StepType} from "../common/steps.component";
import {ErrorListComponent} from "./error.list.component";
import {NotebookwrapperComponent} from "../common/plugins.component";
import {Descriptor, SavePluginEvent} from "../common/tis.plugin";
import {DataxWorkerAddStep0Component} from "./datax.worker.add.step0.component";
import {K8SRCSpec} from "../common/k8s.replics.spec.component";


export const dataXWorkerCfg: { processMeta: ProcessMeta }
= {
processMeta: {
step1CreateSaveEvent: (step1) => {
let e = new SavePluginEvent(true);
e.serverForward = "coredefine:datax_action:save_datax_worker";
e.postPayload = {"k8sSpec": step1.dto.primaryRCSpec};
return e;
},
relaunchClusterMethod: "relaunch_datax_worker",
launchClusterMethod: "launch_datax_worker",
targetName: "datax-worker"
, pageHeader: "PowerJob分布式执行器"
, notCreateTips: "还未创建PowerJob执行器,创建之后可以将DataX构建任务提交到K8S PowerJob集群,高效并行执行分布式数据同步任务"
, createButtonLabel: "创建PowerJob执行器"
//, createButtonLabel: "创建PowerJob执行器"
, stepsType: StepType.CreateWorkderOfDataX
, supportK8SReplicsSpecSetter: true
, supportK8SReplicsSpecSetter: true,
step1Buttons: [
{
label: '创建PowerJob执行器', click: (step1) => {
step1.onClick();
}
},
{
label: '接入已有PowerJob集群', click: (step1) => {
step1.onClickAddExistPowerjobCluster();
}
}
]
, step0InitDescriptorProcess: (cpt: DataxWorkerAddStep0Component, desc: Array<Descriptor>) => {
cpt.initPowerJobRelevantProperties(desc);
}
, step1HeteroGetter: (dto: DataxWorkerDTO) => {
return dto.powderJobServerHetero;
}
, confirmStepCpts: [
{
cptType: PowerjobCptType.Server,
cptShow: (dto: DataxWorkerDTO) => true,
cpuMemorySpecGetter: (dto: DataxWorkerDTO) => {
if (dto.usingPowderJobUseExistCluster) {
return null;
}
return dto.powderJobWorkerRCSpec;
}
}
, {
cptType: PowerjobCptType.Worker,
cptShow: (dto: DataxWorkerDTO) => !dto.usingPowderJobUseExistCluster,
cpuMemorySpecGetter: (dto: DataxWorkerDTO) => dto.powderJobWorkerRCSpec
}
, {
cptType: PowerjobCptType.JobTpl,
cptShow: (dto: DataxWorkerDTO) => true,
cpuMemorySpecGetter: (dto: DataxWorkerDTO) => null
}
]
}
};

const step1FlinkCreateSaveEvent = new SavePluginEvent(true);

const flinkClusterCfg: { processMeta: ProcessMeta }
= {
processMeta: {
step1CreateSaveEvent: (step1) => {
return step1FlinkCreateSaveEvent;
},
launchClusterMethod: "Launch_flink_cluster",
relaunchClusterMethod: "relaunch_flink_cluster",
targetName: "flink-cluster"
, pageHeader: "Flink Native Cluster执行器"
, createButtonLabel: "创建Flink Native Cluster执行器"
// , createButtonLabel: "创建Flink Native Cluster执行器"
, notCreateTips: "还未创建Flink Native Cluster执行器,创建之后可以将Flink Job提交到K8S集群,高效并行执行数据实时同步任务"
, stepsType: StepType.CreateFlinkCluster
, supportK8SReplicsSpecSetter: false
, step1Buttons: [
{
label: '创建Flink Native Cluster执行器', click: (step1) => {
step1.onClick();
}
}
]
, step0InitDescriptorProcess: (cpt: DataxWorkerAddStep0Component, desc: Array<Descriptor>) => {
cpt.initFlinkClusterRelevantProperties(desc);
}
, step1HeteroGetter: (dto: DataxWorkerDTO) => {
return dto.flinkClusterHetero;
}
, confirmStepCpts: [
{
cptType: PowerjobCptType.FlinkCluster,
cptShow: (dto: DataxWorkerDTO) => true,
cpuMemorySpecGetter: (dto: DataxWorkerDTO) => {
return null;
}
}
]
}
};

Expand Down
15 changes: 9 additions & 6 deletions src/base/base.manage.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,29 @@ import {DataxWorkerComponent} from "./datax.worker.component";
import {DataxWorkerAddStep1Component} from "./datax.worker.add.step1.component";
import {DataxWorkerAddStep0Component} from "./datax.worker.add.step0.component";
import {DataxWorkerAddStep2Component} from "./datax.worker.add.step2.component";
import {DataxWorkerAddStep3Component} from "./datax.worker.add.step3.component";
import {DataxWorkerRunningComponent} from "./datax.worker.running.component";
import {
DataxWorkerAddStep3Component,
LaunchK8SClusterWaittingProcessComponent
} from "./datax.worker.add.step3.component";
import {DataxWorkerRunningComponent, PodsListComponent, RCSpecComponent} from "./datax.worker.running.component";

import {MarkdownModule} from "ngx-markdown";
import {ErrorListComponent} from "./error.list.component";
import {DataxWorkerAddStep22Component} from "./datax.worker.add.step2-2.component";
import {DataxWorkerAddExistPowerjobClusterComponent} from "./datax.worker.add.exist.powerjob.cluster.component";

import {NgTerminalModule} from "ng-terminal";


@NgModule({
id: 'basemanage',
imports: [MarkdownModule.forChild(), CommonModule, FormsModule, BaseMangeRoutingModule, TisCommonModule, NzStepsModule, NzInputModule, NzButtonModule, NzTabsModule],
imports: [MarkdownModule.forChild(), CommonModule, FormsModule, BaseMangeRoutingModule, TisCommonModule, NzStepsModule, NzInputModule, NzButtonModule, NzTabsModule, NgTerminalModule],
declarations: [
ApplistComponent, ErrorListComponent, DepartmentAddComponent, BaseMangeIndexComponent, BaseConfigComponent, DepartmentListComponent, AddGlobalParamComponent, GlobalUpdateParamComponent
, AddAppFormComponent, AddAppStepFlowComponent, AddAppFlowDirective, AddAppConfirmComponent, AddappSelectNodesComponent
, DataxWorkerComponent, DataxWorkerAddStep1Component, DataxWorkerAddStep0Component, DataxWorkerAddStep2Component
, DataxWorkerAddExistPowerjobClusterComponent
, DataxWorkerAddStep22Component ,DataxWorkerAddStep3Component
, DataxWorkerRunningComponent
, DataxWorkerAddStep22Component, DataxWorkerAddStep3Component
, DataxWorkerRunningComponent, PodsListComponent, RCSpecComponent, LaunchK8SClusterWaittingProcessComponent
],
entryComponents: [ApplistComponent
, BaseMangeIndexComponent, DepartmentListComponent, AddGlobalParamComponent
Expand Down
6 changes: 3 additions & 3 deletions src/base/datax.add.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ export class DataxAddComponent extends AppFormComponent implements AfterViewInit
=======================================================*/
// DataxAddStep2Component.getDataXReaderWriterEnum(this,new AddStep2ComponentCfg()).then((rwEnum: DataXReaderWriterEnum) => {
// let dto = new DataxDTO();
// dto.dataxPipeName = "mongo_mysql";
// dto.dataxPipeName = "mysql_mysql4";
// dto.processMeta = {readerRDBMS: true, explicitTable: true, writerRDBMS: true, writerSupportMultiTab: false};
// // dto.readerDescriptor = rwEnum.readerDescs.find((r) => "OSS" === r.displayName);
// // dto.writerDescriptor = rwEnum.writerDescs.find((r) => "Elasticsearch" === r.displayName);
// dto.readerDescriptor = rwEnum.readerDescs.find((r) => "MongoDB" === r.displayName);
// dto.readerDescriptor = rwEnum.readerDescs.find((r) => "MySQL" === r.displayName);
// dto.writerDescriptor = rwEnum.writerDescs.find((r) => "MySQL" === r.displayName);
// this.multiViewDAG.loadComponent(DataxAddStep2Component, dto);
// this.multiViewDAG.loadComponent(DataxAddStep4Component, dto);
// });
/**=====================================================
* for test end>>>>>>>>
Expand Down
3 changes: 2 additions & 1 deletion src/base/datax.add.step4.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ export class SelectedTabsComponent extends BasicFormComponent {
}
});
drawerRef.afterClose.subscribe(hetero => {
// console.log("close");
if (!hetero) {
return;
}
Expand Down Expand Up @@ -766,7 +767,7 @@ export class PluginSubFormComponent {
}

verifyPluginConfig(e: PluginSaveResponse) {
if (e.saveSuccess) {
if (e.saveSuccess && e.verify) {
this.drawer.close({hetero: this.hetero[0]});
}
}
Expand Down
Loading

0 comments on commit 9f6b8ae

Please sign in to comment.