We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
剖析目前Message机制以便后续优化,在路由这块需要整理,首先观察目前Message的定义,先忽略Header,展开MessageRoute
type Message struct { Header MessageHeader `json:"header"` Router MessageRoute `json:"route,omitempty"` Content interface{} `json:"content"` } // MessageRoute contains structure of message type MessageRoute struct { // 消息由哪个模块发出 Source string `json:"source,omitempty"` // 消息属于哪个广播组 Group string `json:"group,omitempty"` // 消息对Content的操作类型 Operation string `json:"operation,omitempty"` // 消息Content的资源类型,在边缘和云端有两种格式 // 边缘,kube-system/endpoints/kube-scheduller // 云端,node/edgenode-foo/kube-system/endpoints/kube-scheduller Resource string `json:"resource,omitempty"` }
消息定义中未含有DestinationModule,而是由消息发送时由函数参数给出,Send(module string, message model.Message),这里module == DestinationModule;消息定义中没有包含nodename,节点信息被压缩在Message.Resource当中 MessageContext
Send(module string, message model.Message)
// MessageContext is interface for message syncing type MessageContext interface { // async mode Send(module string, message model.Message) Receive(module string) (model.Message, error) // sync mode SendSync(module string, message model.Message, timeout time.Duration) (model.Message, error) SendResp(message model.Message) // group broadcast SendToGroup(moduleType string, message model.Message) SendToGroupSync(moduleType string, message model.Message, timeout time.Duration) error }
对于点对点传输,无大碍。但如果消息是跨多个模块的,就需要在途径的模块加入路由逻辑,以将消息接力发送至目的模块,这大大影响了代码可读性。例如Edge的MetaManager模块向Cloud的EdgeController模块发送Query消息,以get指定的configmap 或 secret, 单看原始的消息发送代码,或是日志打印出的信息,是非常难以理解的的: processRemoteQuery
func (m *metaManager) processRemoteQuery(message model.Message) { go func() { // TODO: retry originalID := message.GetID() message.UpdateID() resp, err := beehiveContext.SendSync( string(metaManagerConfig.Config.ContextSendModule), // 此参数指向cloudhub模块, message, time.Duration(metaManagerConfig.Config.RemoteQueryTimeout)*time.Second) klog.Infof("########## process get: req[%+v], resp[%+v], err[%+v]", message, resp, err) ...
这里,SendSync()函数的参数表示其发向cloudhub模块,开发者希望的消息完整传输链路是,MetaManager -> EdgeHub -> CloudHub -> EdgeController.UpstreamController,要理解上述代码,首先需要先知晓写在中间两个模块的路由逻辑:
Publish
//cloud/pkg/cloudhub/channelq/channelq.go func (q *ChannelMessageQueue) Publish(msg *beehiveModel.Message) error { switch msg.Router.Source { case model.ResTwin: beehiveContext.SendToGroup(model.SrcDeviceController, *msg) default: beehiveContext.SendToGroup(model.SrcEdgeController, *msg) } return nil }
由于Send(),SendToGroup的module参数是string,Message的Source字段也是string,很难借助某个变量,搜索引用而跳转到中间模块的路由逻辑,而必须肉眼追踪,才能知道此消息发向的最终目的模块,做何处理。例如表示edgecontroller模块名,值为"edgecotroller"的常量目前有:
Content类型是空接口interface{},大多数情况其结构体类型是[]byte Content的装载对象,一般情况下都是一个具体的k8d对象,例如v1.Pod,对于进程内部通信,直接通过断言来将空接口转换成结构体对象十分方便。但是实际使用上,KubeEdge的消息大多都是云边间的跨进程通信,消息经由网络,经历以下过程,Message(结构体对象) -> protobuf(Message,转换成protobuf消息对象) -> 省略 ->网络 -> 省略 ->protobuf(Message) -> Message(结构体对象),其中在转换成protobuf消息对象中,Content会被json编码成[]byte,存入protobuf.Content中。同样消息经由protobuf消息对象转换而来时,Content也是由protobuf.Content赋值而来,类型为[]byte。
type Message struct { Header *MessageHeader `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` Router *MessageRouter `protobuf:"bytes,2,opt,name=router,proto3" json:"router,omitempty"` Content []byte `protobuf:"bytes,3,opt,name=Content,proto3" json:"Content,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (t *MessageTranslator) modelToProto(src *model.Message, dst *message.Message) error { dst.Header.ID = src.GetID() dst.Header.ParentID = src.GetParentID() dst.Header.Timestamp = int64(src.GetTimestamp()) dst.Header.Sync = src.IsSync() dst.Router.Source = src.GetSource() dst.Router.Group = src.GetGroup() dst.Router.Resouce = src.GetResource() dst.Router.Operaion = src.GetOperation() if content := src.GetContent(); content != nil { switch content.(type) { case []byte: dst.Content = content.([]byte) case string: dst.Content = []byte(content.(string)) default: bytes, err := json.Marshal(content) if err != nil { klog.Error("failed to marshal") return err } dst.Content = bytes } } return nil }
所以综上,但凡是跨云边传输的消息,接收端侧的Message.Content一定是[]byte类型。目前在Message.Content的使用上,也是默认其为[]byte类型,而后通过json.Unmarshal获取结构体对象。如果不是[]byte类型,则会强制使用json Marshal成[]byte,再json.Unmarshal。如此,考虑是否显式将Content类型定义为[]byte,而避免新开发者在此产生错误(尝试断言,而不是通过json.Unmarshal获取结构体对象)?
func (uc *UpstreamController) updateNodeStatus() { var data []byte switch msg.Content.(type) { case []byte: data = msg.GetContent().([]byte) default: var err error data, err = json.Marshal(msg.GetContent()) if err != nil { klog.Warningf("message: %s process failure, marshal message content with error: %s", msg.GetID(), err) continue } } ... node := &v1.Node{} err = json.Unmarshal(data, node) ... }
The text was updated successfully, but these errors were encountered:
No branches or pull requests
Message 机制剖析
剖析目前Message机制以便后续优化,在路由这块需要整理,首先观察目前Message的定义,先忽略Header,展开MessageRoute
消息定义中未含有DestinationModule,而是由消息发送时由函数参数给出,
Send(module string, message model.Message)
,这里module == DestinationModule;消息定义中没有包含nodename,节点信息被压缩在Message.Resource当中MessageContext
对于点对点传输,无大碍。但如果消息是跨多个模块的,就需要在途径的模块加入路由逻辑,以将消息接力发送至目的模块,这大大影响了代码可读性。例如Edge的MetaManager模块向Cloud的EdgeController模块发送Query消息,以get指定的configmap 或 secret, 单看原始的消息发送代码,或是日志打印出的信息,是非常难以理解的的:
processRemoteQuery
这里,SendSync()函数的参数表示其发向cloudhub模块,开发者希望的消息完整传输链路是,MetaManager -> EdgeHub -> CloudHub -> EdgeController.UpstreamController,要理解上述代码,首先需要先知晓写在中间两个模块的路由逻辑:
Publish
由于Send(),SendToGroup的module参数是string,Message的Source字段也是string,很难借助某个变量,搜索引用而跳转到中间模块的路由逻辑,而必须肉眼追踪,才能知道此消息发向的最终目的模块,做何处理。例如表示edgecontroller模块名,值为"edgecotroller"的常量目前有:
这些常量离散在不同的包中,无法统一。
Content类型是空接口interface{},大多数情况其结构体类型是[]byte
Content的装载对象,一般情况下都是一个具体的k8d对象,例如v1.Pod,对于进程内部通信,直接通过断言来将空接口转换成结构体对象十分方便。但是实际使用上,KubeEdge的消息大多都是云边间的跨进程通信,消息经由网络,经历以下过程,Message(结构体对象) -> protobuf(Message,转换成protobuf消息对象) -> 省略 ->网络 -> 省略 ->protobuf(Message) -> Message(结构体对象),其中在转换成protobuf消息对象中,Content会被json编码成[]byte,存入protobuf.Content中。同样消息经由protobuf消息对象转换而来时,Content也是由protobuf.Content赋值而来,类型为[]byte。
所以综上,但凡是跨云边传输的消息,接收端侧的Message.Content一定是[]byte类型。目前在Message.Content的使用上,也是默认其为[]byte类型,而后通过json.Unmarshal获取结构体对象。如果不是[]byte类型,则会强制使用json Marshal成[]byte,再json.Unmarshal。如此,考虑是否显式将Content类型定义为[]byte,而避免新开发者在此产生错误(尝试断言,而不是通过json.Unmarshal获取结构体对象)?
The text was updated successfully, but these errors were encountered: