Skip to content

Commit

Permalink
Merge pull request #25 from aceld/feature/aceld
Browse files Browse the repository at this point in the history
add golang golint, workerflow
  • Loading branch information
aceld authored Apr 16, 2024
2 parents 142622d + d0cd9e9 commit 52f8c29
Show file tree
Hide file tree
Showing 73 changed files with 852 additions and 803 deletions.
91 changes: 49 additions & 42 deletions common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,93 +2,100 @@ package common

import "time"

// 用户生成KisId的字符串前缀
// Prefix string for generating KisId by users
const (
KisIdTypeFlow = "flow"
KisIdTypeConnector = "conn"
KisIdTypeFunction = "func"
KisIdTypeGlobal = "global"
KisIdJoinChar = "-"
KisIDTypeFlow = "flow" // KisId type for Flow
KisIDTypeConnector = "conn" // KisId type for Connector
KisIDTypeFunction = "func" // KisId type for Function
KisIDTypeGlobal = "global" // KisId type for Global
KisIDJoinChar = "-" // Joining character for KisId
)

const (
// FunctionIdFirstVirtual 为首结点Function上一层虚拟的Function ID
FunctionIdFirstVirtual = "FunctionIdFirstVirtual"
// FunctionIdLastVirtual 为尾结点Function下一层虚拟的Function ID
FunctionIdLastVirtual = "FunctionIdLastVirtual"
// FunctionIDFirstVirtual is the virtual Function ID for the first node Function
FunctionIDFirstVirtual = "FunctionIDFirstVirtual"
// FunctionIDLastVirtual is the virtual Function ID for the last node Function
FunctionIDLastVirtual = "FunctionIDLastVirtual"
)

// KisMode represents the mode of KisFunction
type KisMode string

const (
// V 为校验特征的KisFunction, 主要进行数据的过滤,验证,字段梳理,幂等等前置数据处理
// V is for Verify, which mainly performs data filtering, validation, field sorting, idempotence, etc.
V KisMode = "Verify"

// S 为存储特征的KisFunction, S会通过KisConnector进行将数据进行存储. S Function 会通过KisConnector进行数据存储,具备相同Connector的Function在逻辑上可以进行并流
// S is for Save, S Function will store data through KisConnector. Functions with the same Connector can logically merge.
S KisMode = "Save"

// L 为加载特征的KisFunction,L会通过KisConnector进行数据加载,L Function 会通过KisConnector进行数据读取,具备相同Connector的Function可以从逻辑上与对应的S Function进行并流
// L is for Load, L Function will load data through KisConnector. Functions with the same Connector can logically merge with corresponding S Function.
L KisMode = "Load"

// C 为计算特征的KisFunction, 可以生成新的字段,计算新的值,进行数据的聚合,分析等
// C is for Calculate, which can generate new fields, calculate new values, and perform data aggregation, analysis, etc.
C KisMode = "Calculate"

// E 为扩展特征的KisFunction,作为流式计算的自定义特征Function,也同时是KisFlow当前流中的最后一个Function,概念类似Sink。
// E is for Expand, which serves as a custom feature Function for stream computing and is also the last Function in the current KisFlow, similar to Sink.
E KisMode = "Expand"
)

/*
是否启动Flow
*/
// KisOnOff Whether to enable the Flow
type KisOnOff int

const (
FlowEnable KisOnOff = 1 // 启动
FlowDisable KisOnOff = 0 // 不启动
// FlowEnable Enabled
FlowEnable KisOnOff = 1
// FlowDisable Disabled
FlowDisable KisOnOff = 0
)

// KisConnType represents the type of KisConnector
type KisConnType string

const (
// REDIS is the type of Redis
REDIS KisConnType = "redis"
// MYSQL is the type of MySQL
MYSQL KisConnType = "mysql"
// KAFKA is the type of Kafka
KAFKA KisConnType = "kafka"
TIDB KisConnType = "tidb"
ES KisConnType = "es"
// TIDB is the type of TiDB
TIDB KisConnType = "tidb"
// ES is the type of Elasticsearch
ES KisConnType = "es"
)

// cache
const (
// DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间
DeFaultFlowCacheCleanUp = 5 // 单位 min
// DefaultExpiration 默认GoCahce时间 ,永久保存
// DeFaultFlowCacheCleanUp is the default cleanup time for Cache in KisFlow's Flow object Cache
DeFaultFlowCacheCleanUp = 5 // unit: min
// DefaultExpiration is the default time for GoCahce, permanent storage
DefaultExpiration time.Duration = 0
)

// metrics
const (
METRICS_ROUTE string = "/metrics"
MetricsRoute string = "/metrics"

LABEL_FLOW_NAME string = "flow_name"
LABEL_FLOW_ID string = "flow_id"
LABEL_FUNCTION_NAME string = "func_name"
LABEL_FUNCTION_MODE string = "func_mode"
LabelFlowName string = "flow_name"
LabelFlowID string = "flow_id"
LabelFunctionName string = "func_name"
LabelFunctionMode string = "func_mode"

COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
CounterKisflowDataTotalName string = "kisflow_data_total"
CounterKisflowDataTotalHelp string = "Total data volume of all KisFlow Flows"

GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow各个FlowID数据流的数据数量总量"
GamgeFlowDataTotalName string = "flow_data_total"
GamgeFlowDataTotalHelp string = "Total data volume of each KisFlow FlowID data stream"

GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
GANGE_FLOW_SCHE_CNTS_HELP string = "KisFlow各个FlowID被调度的次数"
GangeFlowScheCntsName string = "flow_schedule_cnts"
GangeFlowScheCntsHelp string = "Number of times each KisFlow FlowID is scheduled"

GANGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts"
GANGE_FUNC_SCHE_CNTS_HELP string = "KisFlow各个Function被调度的次数"
GangeFuncScheCntsName string = "func_schedule_cnts"
GangeFuncScheCntsHelp string = "Number of times each KisFlow Function is scheduled"

HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration"
HISTOGRAM_FUNCTION_DURATION_HELP string = "Function执行耗时"
HistogramFunctionDurationName string = "func_run_duration"
HistogramFunctionDurationHelp string = "Function execution time"

HISTOGRAM_FLOW_DURATION_NAME string = "flow_run_duration"
HISTOGRAM_FLOW_DURATION_HELP string = "Flow执行耗时"
HistogramFlowDurationName string = "flow_run_duration"
HistogramFlowDurationHelp string = "Flow execution time"
)
12 changes: 5 additions & 7 deletions common/data_type.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package common

// KisRow 一行数据
// KisRow represents a single row of data
type KisRow interface{}

// KisRowArr 一次业务的批量数据
// KisRowArr represents a batch of data for a single business operation
type KisRowArr []KisRow

/*
KisDataMap 当前Flow承载的全部数据
key : 数据所在的Function ID
value: 对应的KisRow
*/
// KisDataMap contains all the data carried by the current Flow
// key : Function ID where the data resides
// value: Corresponding KisRow
type KisDataMap map[string]KisRowArr
34 changes: 14 additions & 20 deletions config/kis_conn_config.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,38 @@
package config

import (
"errors"
"fmt"

"github.com/aceld/kis-flow/common"
)

// KisConnConfig KisConnector 策略配置
// KisConnConfig describes the KisConnector strategy configuration
type KisConnConfig struct {
//配置类型
KisType string `yaml:"kistype"`
//唯一描述标识
CName string `yaml:"cname"`
//基础存储媒介地址
AddrString string `yaml:"addrs"`
//存储媒介引擎类型"Mysql" "Redis" "Kafka"等
Type common.KisConnType `yaml:"type"`
//一次存储的标识:如Redis为Key名称、Mysql为Table名称,Kafka为Topic名称等
Key string `yaml:"key"`
//配置信息中的自定义参数
Params map[string]string `yaml:"params"`
//存储读取所绑定的NsFuncionID
KisType string `yaml:"kistype"` // Configuration type
CName string `yaml:"cname"` // Unique descriptive identifier
AddrString string `yaml:"addrs"` // Base storage medium address
Type common.KisConnType `yaml:"type"` // Storage medium engine type: "Mysql", "Redis", "Kafka", etc.
Key string `yaml:"key"` // Identifier for a single storage: Key name for Redis, Table name for Mysql, Topic name for Kafka, etc.
Params map[string]string `yaml:"params"` // Custom parameters in the configuration information

// NsFuncionID bound to storage reading
Load []string `yaml:"load"`
Save []string `yaml:"save"`
}

// NewConnConfig 创建一个KisConnector策略配置对象, 用于描述一个KisConnector信息
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
// NewConnConfig creates a KisConnector strategy configuration object, used to describe a KisConnector information
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param map[string]string) *KisConnConfig {
strategy := new(KisConnConfig)
strategy.CName = cName
strategy.AddrString = addr

strategy.Type = t
strategy.Key = key
strategy.Params = param

return strategy
}

// WithFunc Connector与Function进行关系绑定
// WithFunc binds Connector to Function
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {

switch common.KisMode(fConfig.FMode) {
Expand All @@ -47,7 +41,7 @@ func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
case common.L:
cConfig.Load = append(cConfig.Load, fConfig.FName)
default:
return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
return fmt.Errorf("Wrong KisMode %s", fConfig.FMode)
}

return nil
Expand Down
12 changes: 6 additions & 6 deletions config/kis_flow_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package config

import "github.com/aceld/kis-flow/common"

// KisFlowFunctionParam 一个Flow配置中Function的Id及携带固定配置参数
// KisFlowFunctionParam represents the Id of a Function and carries fixed configuration parameters in a Flow configuration
type KisFlowFunctionParam struct {
FuncName string `yaml:"fname"` //必须
Params FParam `yaml:"params"` //选填,在当前Flow中Function定制固定配置参数
FuncName string `yaml:"fname"` // Required
Params FParam `yaml:"params"` // Optional, custom fixed configuration parameters for the Function in the current Flow
}

// KisFlowConfig 用户贯穿整条流式计算上下文环境的对象
// KisFlowConfig represents the object that spans the entire stream computing context environment
type KisFlowConfig struct {
KisType string `yaml:"kistype"`
Status int `yaml:"status"`
FlowName string `yaml:"flow_name"`
Flows []KisFlowFunctionParam `yaml:"flows"`
}

// NewFlowConfig 创建一个Flow策略配置对象, 用于描述一个KisFlow信息
// NewFlowConfig creates a Flow strategy configuration object, used to describe a KisFlow information
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
config := new(KisFlowConfig)
config.FlowName = flowName
Expand All @@ -27,7 +27,7 @@ func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
return config
}

// AppendFunctionConfig 添加一个Function Config 到当前Flow中
// AppendFunctionConfig adds a Function Config to the current Flow
func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {
fConfig.Flows = append(fConfig.Flows, params)
}
43 changes: 23 additions & 20 deletions config/kis_func_config.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
package config

import (
"errors"
"fmt"

"github.com/aceld/kis-flow/common"
"github.com/aceld/kis-flow/log"
)

// FParam 在当前Flow中Function定制固定配置参数类型
// FParam represents the type for custom fixed configuration parameters for the Function in the current Flow
type FParam map[string]string

// KisSource 表示当前Function的业务源
// KisSource represents the business source of the current Function
type KisSource struct {
Name string `yaml:"name"` // 本层Function的数据源描述
Must []string `yaml:"must"` // source必传字段
Name string `yaml:"name"` // Description of the data source for this layer Function
Must []string `yaml:"must"` // Required fields for the source
}

// KisFuncOption 可选配置
// KisFuncOption represents optional configurations
type KisFuncOption struct {
CName string `yaml:"cname"` // 连接器Connector名称
RetryTimes int `yaml:"retry_times"` // 选填,Function调度重试(不包括正常调度)最大次数
RetryDuration int `yaml:"return_duration"` // 选填,Function调度每次重试最大时间间隔(单位:ms)
Params FParam `yaml:"default_params"` // 选填,在当前Flow中Function定制固定配置参数
CName string `yaml:"cname"` // Connector name
RetryTimes int `yaml:"retry_times"` // Optional, maximum retry times for Function scheduling (excluding normal scheduling)
RetryDuration int `yaml:"return_duration"` // Optional, maximum time interval for each retry in Function scheduling (unit: ms)
Params FParam `yaml:"default_params"` // Optional, custom fixed configuration parameters for the Function in the current Flow
}

// KisFuncConfig 一个KisFunction策略配置
// KisFuncConfig represents a KisFunction strategy configuration
type KisFuncConfig struct {
KisType string `yaml:"kistype"`
FName string `yaml:"fname"`
Expand All @@ -33,7 +34,7 @@ type KisFuncConfig struct {
connConf *KisConnConfig
}

// NewFuncConfig 创建一个Function策略配置对象, 用于描述一个KisFunction信息
// NewFuncConfig creates a Function strategy configuration object, used to describe a KisFunction information
func NewFuncConfig(
funcName string, mode common.KisMode,
source *KisSource, option *KisFuncOption) *KisFuncConfig {
Expand All @@ -53,13 +54,13 @@ func NewFuncConfig(
config.FMode = string(mode)

/*
// FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系
// Functions S and L require the KisConnector parameters to be passed as they need to establish streaming relationships through Connector
if mode == common.S || mode == common.L {
if option == nil {
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
log.Logger().ErrorF("Function S/L needs option->Cid\n")
return nil
} else if option.CName == "" {
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
log.Logger().ErrorF("Function S/L needs option->Cid\n")
return nil
}
}
Expand All @@ -72,26 +73,28 @@ func NewFuncConfig(
return config
}

// AddConnConfig WithConn binds Function to Connector
func (fConf *KisFuncConfig) AddConnConfig(cConf *KisConnConfig) error {
if cConf == nil {
return errors.New("KisConnConfig is nil")
return fmt.Errorf("KisConnConfig is nil")
}

// Function需要和Connector进行关联
// Function needs to be associated with Connector
fConf.connConf = cConf

// Connector需要和Function进行关联
// Connector needs to be associated with Function
_ = cConf.WithFunc(fConf)

// 更新Function配置中的CName
// Update CName in Function configuration
fConf.Option.CName = cConf.CName

return nil
}

// GetConnConfig gets the Connector configuration
func (fConf *KisFuncConfig) GetConnConfig() (*KisConnConfig, error) {
if fConf.connConf == nil {
return nil, errors.New("KisFuncConfig.connConf not set")
return nil, fmt.Errorf("KisFuncConfig.connConf not set")
}

return fConf.connConf, nil
Expand Down
11 changes: 6 additions & 5 deletions config/kis_global_config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package config

// KisGlobalConfig represents the global configuration for KisFlow
type KisGlobalConfig struct {
//kistype Global为kisflow的全局配置
// KisType Global is the global configuration for kisflow
KisType string `yaml:"kistype"`
//是否启动prometheus监控
// EnableProm indicates whether to start Prometheus monitoring
EnableProm bool `yaml:"prometheus_enable"`
//是否需要kisflow单独启动端口监听
// PrometheusListen indicates whether kisflow needs to start a separate port for listening
PrometheusListen bool `yaml:"prometheus_listen"`
//prometheus取点监听地址
// PrometheusServe is the address for Prometheus scraping
PrometheusServe string `yaml:"prometheus_serve"`
}

// GlobalConfig 默认全局配置,全部均为关闭
// GlobalConfig is the default global configuration, all are set to off
var GlobalConfig = new(KisGlobalConfig)
Loading

0 comments on commit 52f8c29

Please sign in to comment.