Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add golang golint, workerflow #25

Merged
merged 12 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 49 additions & 42 deletions common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,93 +2,100 @@ package common

import "time"

// 用户生成KisId的字符串前缀
// Prefix string for generating KisId by users
const (
KisIdTypeFlow = "flow"
KisIdTypeConnector = "conn"
KisIdTypeFunction = "func"
KisIdTypeGlobal = "global"
KisIdJoinChar = "-"
KisIDTypeFlow = "flow" // KisId type for Flow
KisIDTypeConnector = "conn" // KisId type for Connector
KisIDTypeFunction = "func" // KisId type for Function
KisIDTypeGlobal = "global" // KisId type for Global
KisIDJoinChar = "-" // Joining character for KisId
)

const (
// FunctionIdFirstVirtual 为首结点Function上一层虚拟的Function ID
FunctionIdFirstVirtual = "FunctionIdFirstVirtual"
// FunctionIdLastVirtual 为尾结点Function下一层虚拟的Function ID
FunctionIdLastVirtual = "FunctionIdLastVirtual"
// FunctionIDFirstVirtual is the virtual Function ID for the first node Function
FunctionIDFirstVirtual = "FunctionIDFirstVirtual"
// FunctionIDLastVirtual is the virtual Function ID for the last node Function
FunctionIDLastVirtual = "FunctionIDLastVirtual"
)

// KisMode represents the mode of KisFunction
type KisMode string

const (
// V 为校验特征的KisFunction, 主要进行数据的过滤,验证,字段梳理,幂等等前置数据处理
// V is for Verify, which mainly performs data filtering, validation, field sorting, idempotence, etc.
V KisMode = "Verify"

// S 为存储特征的KisFunction, S会通过KisConnector进行将数据进行存储. S Function 会通过KisConnector进行数据存储,具备相同Connector的Function在逻辑上可以进行并流
// S is for Save, S Function will store data through KisConnector. Functions with the same Connector can logically merge.
S KisMode = "Save"

// L 为加载特征的KisFunction,L会通过KisConnector进行数据加载,L Function 会通过KisConnector进行数据读取,具备相同Connector的Function可以从逻辑上与对应的S Function进行并流
// L is for Load, L Function will load data through KisConnector. Functions with the same Connector can logically merge with corresponding S Function.
L KisMode = "Load"

// C 为计算特征的KisFunction, 可以生成新的字段,计算新的值,进行数据的聚合,分析等
// C is for Calculate, which can generate new fields, calculate new values, and perform data aggregation, analysis, etc.
C KisMode = "Calculate"

// E 为扩展特征的KisFunction,作为流式计算的自定义特征Function,也同时是KisFlow当前流中的最后一个Function,概念类似Sink。
// E is for Expand, which serves as a custom feature Function for stream computing and is also the last Function in the current KisFlow, similar to Sink.
E KisMode = "Expand"
)

/*
是否启动Flow
*/
// KisOnOff Whether to enable the Flow
type KisOnOff int

const (
FlowEnable KisOnOff = 1 // 启动
FlowDisable KisOnOff = 0 // 不启动
// FlowEnable Enabled
FlowEnable KisOnOff = 1
// FlowDisable Disabled
FlowDisable KisOnOff = 0
)

// KisConnType represents the type of KisConnector
type KisConnType string

const (
// REDIS is the type of Redis
REDIS KisConnType = "redis"
// MYSQL is the type of MySQL
MYSQL KisConnType = "mysql"
// KAFKA is the type of Kafka
KAFKA KisConnType = "kafka"
TIDB KisConnType = "tidb"
ES KisConnType = "es"
// TIDB is the type of TiDB
TIDB KisConnType = "tidb"
// ES is the type of Elasticsearch
ES KisConnType = "es"
)

// cache
const (
// DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间
DeFaultFlowCacheCleanUp = 5 // 单位 min
// DefaultExpiration 默认GoCahce时间 ,永久保存
// DeFaultFlowCacheCleanUp is the default cleanup time for Cache in KisFlow's Flow object Cache
DeFaultFlowCacheCleanUp = 5 // unit: min
// DefaultExpiration is the default time for GoCahce, permanent storage
DefaultExpiration time.Duration = 0
)

// metrics
const (
METRICS_ROUTE string = "/metrics"
MetricsRoute string = "/metrics"

LABEL_FLOW_NAME string = "flow_name"
LABEL_FLOW_ID string = "flow_id"
LABEL_FUNCTION_NAME string = "func_name"
LABEL_FUNCTION_MODE string = "func_mode"
LabelFlowName string = "flow_name"
LabelFlowID string = "flow_id"
LabelFunctionName string = "func_name"
LabelFunctionMode string = "func_mode"

COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
CounterKisflowDataTotalName string = "kisflow_data_total"
CounterKisflowDataTotalHelp string = "Total data volume of all KisFlow Flows"

GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow各个FlowID数据流的数据数量总量"
GamgeFlowDataTotalName string = "flow_data_total"
GamgeFlowDataTotalHelp string = "Total data volume of each KisFlow FlowID data stream"

GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
GANGE_FLOW_SCHE_CNTS_HELP string = "KisFlow各个FlowID被调度的次数"
GangeFlowScheCntsName string = "flow_schedule_cnts"
GangeFlowScheCntsHelp string = "Number of times each KisFlow FlowID is scheduled"

GANGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts"
GANGE_FUNC_SCHE_CNTS_HELP string = "KisFlow各个Function被调度的次数"
GangeFuncScheCntsName string = "func_schedule_cnts"
GangeFuncScheCntsHelp string = "Number of times each KisFlow Function is scheduled"

HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration"
HISTOGRAM_FUNCTION_DURATION_HELP string = "Function执行耗时"
HistogramFunctionDurationName string = "func_run_duration"
HistogramFunctionDurationHelp string = "Function execution time"

HISTOGRAM_FLOW_DURATION_NAME string = "flow_run_duration"
HISTOGRAM_FLOW_DURATION_HELP string = "Flow执行耗时"
HistogramFlowDurationName string = "flow_run_duration"
HistogramFlowDurationHelp string = "Flow execution time"
)
12 changes: 5 additions & 7 deletions common/data_type.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package common

// KisRow 一行数据
// KisRow represents a single row of data
type KisRow interface{}

// KisRowArr 一次业务的批量数据
// KisRowArr represents a batch of data for a single business operation
type KisRowArr []KisRow

/*
KisDataMap 当前Flow承载的全部数据
key : 数据所在的Function ID
value: 对应的KisRow
*/
// KisDataMap contains all the data carried by the current Flow
// key : Function ID where the data resides
// value: Corresponding KisRow
type KisDataMap map[string]KisRowArr
34 changes: 14 additions & 20 deletions config/kis_conn_config.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,38 @@
package config

import (
"errors"
"fmt"

"github.com/aceld/kis-flow/common"
)

// KisConnConfig KisConnector 策略配置
// KisConnConfig describes the KisConnector strategy configuration
type KisConnConfig struct {
//配置类型
KisType string `yaml:"kistype"`
//唯一描述标识
CName string `yaml:"cname"`
//基础存储媒介地址
AddrString string `yaml:"addrs"`
//存储媒介引擎类型"Mysql" "Redis" "Kafka"等
Type common.KisConnType `yaml:"type"`
//一次存储的标识:如Redis为Key名称、Mysql为Table名称,Kafka为Topic名称等
Key string `yaml:"key"`
//配置信息中的自定义参数
Params map[string]string `yaml:"params"`
//存储读取所绑定的NsFuncionID
KisType string `yaml:"kistype"` // Configuration type
CName string `yaml:"cname"` // Unique descriptive identifier
AddrString string `yaml:"addrs"` // Base storage medium address
Type common.KisConnType `yaml:"type"` // Storage medium engine type: "Mysql", "Redis", "Kafka", etc.
Key string `yaml:"key"` // Identifier for a single storage: Key name for Redis, Table name for Mysql, Topic name for Kafka, etc.
Params map[string]string `yaml:"params"` // Custom parameters in the configuration information

// NsFuncionID bound to storage reading
Load []string `yaml:"load"`
Save []string `yaml:"save"`
}

// NewConnConfig 创建一个KisConnector策略配置对象, 用于描述一个KisConnector信息
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
// NewConnConfig creates a KisConnector strategy configuration object, used to describe a KisConnector information
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param map[string]string) *KisConnConfig {
strategy := new(KisConnConfig)
strategy.CName = cName
strategy.AddrString = addr

strategy.Type = t
strategy.Key = key
strategy.Params = param

return strategy
}

// WithFunc Connector与Function进行关系绑定
// WithFunc binds Connector to Function
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {

switch common.KisMode(fConfig.FMode) {
Expand All @@ -47,7 +41,7 @@ func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
case common.L:
cConfig.Load = append(cConfig.Load, fConfig.FName)
default:
return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
return fmt.Errorf("Wrong KisMode %s", fConfig.FMode)
}

return nil
Expand Down
12 changes: 6 additions & 6 deletions config/kis_flow_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package config

import "github.com/aceld/kis-flow/common"

// KisFlowFunctionParam 一个Flow配置中Function的Id及携带固定配置参数
// KisFlowFunctionParam represents the Id of a Function and carries fixed configuration parameters in a Flow configuration
type KisFlowFunctionParam struct {
FuncName string `yaml:"fname"` //必须
Params FParam `yaml:"params"` //选填,在当前Flow中Function定制固定配置参数
FuncName string `yaml:"fname"` // Required
Params FParam `yaml:"params"` // Optional, custom fixed configuration parameters for the Function in the current Flow
}

// KisFlowConfig 用户贯穿整条流式计算上下文环境的对象
// KisFlowConfig represents the object that spans the entire stream computing context environment
type KisFlowConfig struct {
KisType string `yaml:"kistype"`
Status int `yaml:"status"`
FlowName string `yaml:"flow_name"`
Flows []KisFlowFunctionParam `yaml:"flows"`
}

// NewFlowConfig 创建一个Flow策略配置对象, 用于描述一个KisFlow信息
// NewFlowConfig creates a Flow strategy configuration object, used to describe a KisFlow information
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
config := new(KisFlowConfig)
config.FlowName = flowName
Expand All @@ -27,7 +27,7 @@ func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
return config
}

// AppendFunctionConfig 添加一个Function Config 到当前Flow中
// AppendFunctionConfig adds a Function Config to the current Flow
func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {
fConfig.Flows = append(fConfig.Flows, params)
}
43 changes: 23 additions & 20 deletions config/kis_func_config.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
package config

import (
"errors"
"fmt"

"github.com/aceld/kis-flow/common"
"github.com/aceld/kis-flow/log"
)

// FParam 在当前Flow中Function定制固定配置参数类型
// FParam represents the type for custom fixed configuration parameters for the Function in the current Flow
type FParam map[string]string

// KisSource 表示当前Function的业务源
// KisSource represents the business source of the current Function
type KisSource struct {
Name string `yaml:"name"` // 本层Function的数据源描述
Must []string `yaml:"must"` // source必传字段
Name string `yaml:"name"` // Description of the data source for this layer Function
Must []string `yaml:"must"` // Required fields for the source
}

// KisFuncOption 可选配置
// KisFuncOption represents optional configurations
type KisFuncOption struct {
CName string `yaml:"cname"` // 连接器Connector名称
RetryTimes int `yaml:"retry_times"` // 选填,Function调度重试(不包括正常调度)最大次数
RetryDuration int `yaml:"return_duration"` // 选填,Function调度每次重试最大时间间隔(单位:ms)
Params FParam `yaml:"default_params"` // 选填,在当前Flow中Function定制固定配置参数
CName string `yaml:"cname"` // Connector name
RetryTimes int `yaml:"retry_times"` // Optional, maximum retry times for Function scheduling (excluding normal scheduling)
RetryDuration int `yaml:"return_duration"` // Optional, maximum time interval for each retry in Function scheduling (unit: ms)
Params FParam `yaml:"default_params"` // Optional, custom fixed configuration parameters for the Function in the current Flow
}

// KisFuncConfig 一个KisFunction策略配置
// KisFuncConfig represents a KisFunction strategy configuration
type KisFuncConfig struct {
KisType string `yaml:"kistype"`
FName string `yaml:"fname"`
Expand All @@ -33,7 +34,7 @@ type KisFuncConfig struct {
connConf *KisConnConfig
}

// NewFuncConfig 创建一个Function策略配置对象, 用于描述一个KisFunction信息
// NewFuncConfig creates a Function strategy configuration object, used to describe a KisFunction information
func NewFuncConfig(
funcName string, mode common.KisMode,
source *KisSource, option *KisFuncOption) *KisFuncConfig {
Expand All @@ -53,13 +54,13 @@ func NewFuncConfig(
config.FMode = string(mode)

/*
// FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系
// Functions S and L require the KisConnector parameters to be passed as they need to establish streaming relationships through Connector
if mode == common.S || mode == common.L {
if option == nil {
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
log.Logger().ErrorF("Function S/L needs option->Cid\n")
return nil
} else if option.CName == "" {
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
log.Logger().ErrorF("Function S/L needs option->Cid\n")
return nil
}
}
Expand All @@ -72,26 +73,28 @@ func NewFuncConfig(
return config
}

// AddConnConfig WithConn binds Function to Connector
func (fConf *KisFuncConfig) AddConnConfig(cConf *KisConnConfig) error {
if cConf == nil {
return errors.New("KisConnConfig is nil")
return fmt.Errorf("KisConnConfig is nil")
}

// Function需要和Connector进行关联
// Function needs to be associated with Connector
fConf.connConf = cConf

// Connector需要和Function进行关联
// Connector needs to be associated with Function
_ = cConf.WithFunc(fConf)

// 更新Function配置中的CName
// Update CName in Function configuration
fConf.Option.CName = cConf.CName

return nil
}

// GetConnConfig gets the Connector configuration
func (fConf *KisFuncConfig) GetConnConfig() (*KisConnConfig, error) {
if fConf.connConf == nil {
return nil, errors.New("KisFuncConfig.connConf not set")
return nil, fmt.Errorf("KisFuncConfig.connConf not set")
}

return fConf.connConf, nil
Expand Down
11 changes: 6 additions & 5 deletions config/kis_global_config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package config

// KisGlobalConfig represents the global configuration for KisFlow
type KisGlobalConfig struct {
//kistype Global为kisflow的全局配置
// KisType Global is the global configuration for kisflow
KisType string `yaml:"kistype"`
//是否启动prometheus监控
// EnableProm indicates whether to start Prometheus monitoring
EnableProm bool `yaml:"prometheus_enable"`
//是否需要kisflow单独启动端口监听
// PrometheusListen indicates whether kisflow needs to start a separate port for listening
PrometheusListen bool `yaml:"prometheus_listen"`
//prometheus取点监听地址
// PrometheusServe is the address for Prometheus scraping
PrometheusServe string `yaml:"prometheus_serve"`
}

// GlobalConfig 默认全局配置,全部均为关闭
// GlobalConfig is the default global configuration, all are set to off
var GlobalConfig = new(KisGlobalConfig)
Loading