diff --git a/.gitignore b/.gitignore index 47ee7680..88f888d9 100644 --- a/.gitignore +++ b/.gitignore @@ -29,5 +29,7 @@ _testmain.go samples/simple/client samples/simple/server +samples/ab/frame_client_ab +samples/ab/server socket/example/client socket/example/server \ No newline at end of file diff --git a/README.md b/README.md index 6e19a96d..1c96d776 100644 --- a/README.md +++ b/README.md @@ -241,7 +241,7 @@ func (p *AliasPlugin) Name() string { } // PostReadHeader converts the alias of this service. -func (p *AliasPlugin) PostReadHeader(ctx tp.ReadCtx) error { +func (p *AliasPlugin) PostReadHeader(ctx tp.ReadCtx) tp.Xerror { var u = ctx.Input().Header.Uri if p.Aliases != nil { if a = p.Aliases[u]; a != "" { diff --git a/README_ZH.md b/README_ZH.md index 6237dc74..3a1e1a05 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -242,7 +242,7 @@ func (p *AliasPlugin) Name() string { } // PostReadHeader converts the alias of this service. -func (p *AliasPlugin) PostReadHeader(ctx tp.ReadCtx) error { +func (p *AliasPlugin) PostReadHeader(ctx tp.ReadCtx) tp.Xerror { var u = ctx.Input().Header.Uri if p.Aliases != nil { if a = p.Aliases[u]; a != "" { diff --git a/context.go b/context.go index a08df0e0..41f0a3a6 100644 --- a/context.go +++ b/context.go @@ -233,12 +233,11 @@ func (c *readHandleCtx) binding(header *socket.Header) (body interface{}) { } func (c *readHandleCtx) bindPush(header *socket.Header) interface{} { - err := c.pluginContainer.PostReadHeader(c) + var err error + err = c.pluginContainer.PostReadHeader(c) if err != nil { - Errorf("%s", err.Error()) return nil } - c.uri, err = url.Parse(header.Uri) if err != nil { return nil @@ -256,7 +255,6 @@ func (c *readHandleCtx) bindPush(header *socket.Header) interface{} { err = c.pluginContainer.PreReadBody(c) if err != nil { - Errorf("%s", err.Error()) return nil } @@ -279,7 +277,6 @@ func (c *readHandleCtx) handlePush() { err := c.pluginContainer.PostReadBody(c) if err != nil { - Errorf("%s", err.Error()) return } @@ -297,15 +294,13 @@ func (c *readHandleCtx) bindPull(header *socket.Header) interface{} { c.output.HeaderCodec = c.input.HeaderCodec c.output.Header.Gzip = header.Gzip - err := c.pluginContainer.PostReadHeader(c) - if err != nil { - errStr := err.Error() - Errorf("%s", errStr) - c.output.Header.StatusCode = StatusFailedPlugin - c.output.Header.Status = errStr + xerr := c.pluginContainer.PostReadHeader(c) + if xerr != nil { + c.output.Header.StatusCode = xerr.Code() + c.output.Header.Status = xerr.Text() return nil } - + var err error c.uri, err = url.Parse(header.Uri) if err != nil { c.output.Header.StatusCode = StatusBadUri @@ -329,12 +324,10 @@ func (c *readHandleCtx) bindPull(header *socket.Header) interface{} { c.input.Body = c.arg.Interface() } - err = c.pluginContainer.PreReadBody(c) - if err != nil { - errStr := err.Error() - Errorf("%s", errStr) - c.output.Header.StatusCode = StatusFailedPlugin - c.output.Header.Status = errStr + xerr = c.pluginContainer.PreReadBody(c) + if xerr != nil { + c.output.Header.StatusCode = xerr.Code() + c.output.Header.Status = xerr.Text() return nil } @@ -352,13 +345,10 @@ func (c *readHandleCtx) handlePull() { c.cost = time.Since(c.start) c.sess.runlog(c.cost, c.input, c.output) }() - err := c.pluginContainer.PostReadBody(c) - if err != nil { - errStr := err.Error() - Errorf("%s", errStr) - c.output.Header.Status = errStr - c.output.Header.StatusCode = StatusFailedPlugin - c.output = nil + xerr := c.pluginContainer.PostReadBody(c) + if xerr != nil { + c.output.Header.StatusCode = xerr.Code() + c.output.Header.Status = xerr.Text() } // handle pull @@ -372,24 +362,14 @@ func (c *readHandleCtx) handlePull() { } // reply pull - if err = c.pluginContainer.PreWriteReply(c); err != nil { - errStr := err.Error() - c.output.Body = nil - if statusOK { - c.output.Header.StatusCode = StatusFailedPlugin - c.output.Header.Status = errStr - } - Errorf("%s", errStr) - } + c.pluginContainer.PreWriteReply(c) - if err = c.sess.write(c.output); err != nil { + if err := c.sess.write(c.output); err != nil { c.output.Header.StatusCode = StatusWriteFailed c.output.Header.Status = StatusText(StatusWriteFailed) + ": " + err.Error() } - if err = c.pluginContainer.PostWriteReply(c); err != nil { - Errorf("%s", err.Error()) - } + c.pluginContainer.PostWriteReply(c) } func (c *readHandleCtx) bindReply(header *socket.Header) interface{} { @@ -401,14 +381,14 @@ func (c *readHandleCtx) bindReply(header *socket.Header) interface{} { c.pullCmd = pullCmd.(*PullCmd) c.public = c.pullCmd.public - err := c.pluginContainer.PostReadHeader(c) - if err != nil { - c.pullCmd.Xerror = NewXerror(StatusFailedPlugin, err.Error()) + xerr := c.pluginContainer.PostReadHeader(c) + if xerr != nil { + c.pullCmd.Xerror = xerr return nil } - err = c.pluginContainer.PreReadBody(c) - if err != nil { - c.pullCmd.Xerror = NewXerror(StatusFailedPlugin, err.Error()) + xerr = c.pluginContainer.PreReadBody(c) + if xerr != nil { + c.pullCmd.Xerror = xerr return nil } return c.pullCmd.reply @@ -433,8 +413,8 @@ func (c *readHandleCtx) handleReply() { } if c.input.Header.StatusCode != StatusOK { c.pullCmd.Xerror = NewXerror(c.input.Header.StatusCode, c.input.Header.Status) - } else if err := c.pluginContainer.PostReadBody(c); err != nil { - c.pullCmd.Xerror = NewXerror(StatusFailedPlugin, err.Error()) + } else if xerr := c.pluginContainer.PostReadBody(c); xerr != nil { + c.pullCmd.Xerror = xerr } } @@ -446,12 +426,7 @@ func (c *readHandleCtx) handleUnsupported() { c.cost = time.Since(c.start) c.sess.runlog(c.cost, c.input, c.output) }() - err := c.pluginContainer.PostReadBody(c) - if err != nil { - errStr := err.Error() - Errorf("%s", errStr) - } - + c.pluginContainer.PostReadBody(c) c.output.Header.StatusCode = StatusUnsupportedTx c.output.Header.Status = StatusText(StatusUnsupportedTx) c.output.Body = nil @@ -460,18 +435,15 @@ func (c *readHandleCtx) handleUnsupported() { c.output.BodyCodec = c.input.BodyCodec } - if err = c.pluginContainer.PreWriteReply(c); err != nil { - Errorf("%s", err.Error()) - } + c.pluginContainer.PreWriteReply(c) - if err = c.sess.write(c.output); err != nil { + err := c.sess.write(c.output) + if err != nil { c.output.Header.StatusCode = StatusWriteFailed c.output.Header.Status = StatusText(StatusWriteFailed) + ": " + err.Error() } - if err = c.pluginContainer.PostWriteReply(c); err != nil { - Errorf("%s", err.Error()) - } + c.pluginContainer.PostWriteReply(c) } // InputBodyBytes if the input body binder is []byte type, returns it, else returns nil. diff --git a/parameter.go b/parameter.go index b9109f32..2cc70363 100644 --- a/parameter.go +++ b/parameter.go @@ -39,8 +39,11 @@ func TypeText(typ int32) string { } } -// Response Header status codes as registered with IANA. +// Common response header status as registered with IANA. +// Note: Recommended custom status code is greater than 1000. const ( + StatusUndefined = 0 + StatusWriteFailed = 100 StatusConnClosed = 101 @@ -52,7 +55,7 @@ const ( StatusConflict = 409 StatusUnsupportedTx = 410 StatusUnsupportedCodecType = 415 - StatusFailedPlugin = 424 + // StatusFailedPlugin = 424 StatusInternalServerError = 500 StatusNotImplemented = 501 @@ -67,7 +70,8 @@ const ( ) var statusText = map[int]string{ - StatusWriteFailed: "write failed", + StatusUndefined: "Undefined Status", + StatusWriteFailed: "Write Failed", StatusConnClosed: "Connection Closed", StatusOK: "OK", StatusBadUri: "Bad URI", @@ -76,7 +80,7 @@ var statusText = map[int]string{ StatusConflict: "Conflict", StatusUnsupportedTx: "Unsupported transaction type", StatusUnsupportedCodecType: "Unsupported codec type", - StatusFailedPlugin: "Failed Plugin", + // StatusFailedPlugin: "Failed Plugin", StatusInternalServerError: "Internal Server Error", StatusNotImplemented: "Not Implemented", diff --git a/peer.go b/peer.go index 60579606..cb169e63 100644 --- a/peer.go +++ b/peer.go @@ -106,13 +106,12 @@ func (p *Peer) Dial(addr string, id ...string) (Session, error) { } var sess = newSession(p, conn, id...) if err = p.pluginContainer.PostDial(sess); err != nil { - Printf("post dial fail (addr: %s, id: %s) ", addr, sess.Id()) sess.Close() return nil, err } Go(sess.startReadAndHandle) p.sessHub.Set(sess) - Printf("dial ok (addr: %s, id: %s)", addr, sess.Id()) + Infof("dial ok (addr: %s, id: %s)", addr, sess.Id()) return sess, nil } @@ -129,13 +128,12 @@ func (p *Peer) DialContext(ctx context.Context, addr string, id ...string) (Sess } var sess = newSession(p, conn, id...) if err = p.pluginContainer.PostDial(sess); err != nil { - Printf("post dial fail (addr: %s, id: %s) ", addr, sess.Id()) sess.Close() return nil, err } Go(sess.startReadAndHandle) p.sessHub.Set(sess) - Printf("dial ok (addr: %s, id: %s)", addr, sess.Id()) + Infof("dial ok (addr: %s, id: %s)", addr, sess.Id()) return sess, nil } @@ -212,7 +210,6 @@ func (p *Peer) listen(addr string) error { if !Go(func() { if err := p.pluginContainer.PostAccept(sess); err != nil { sess.Close() - Tracef("accept session(addr: %s, id: %s) error: %s", sess.RemoteIp(), sess.Id(), err.Error()) return } Tracef("accept session(addr: %s, id: %s) ok", sess.RemoteIp(), sess.Id()) diff --git a/plugin.go b/plugin.go index d718fee3..8cddae49 100644 --- a/plugin.go +++ b/plugin.go @@ -24,43 +24,43 @@ type ( Name() string } PostRegPlugin interface { - PostReg(*Handler) error + PostReg(*Handler) Xerror } PostDialPlugin interface { - PostDial(ForeSession) error + PostDial(ForeSession) Xerror } PostAcceptPlugin interface { - PostAccept(ForeSession) error + PostAccept(ForeSession) Xerror } PreWritePullPlugin interface { - PreWritePull(WriteCtx) error + PreWritePull(WriteCtx) Xerror } PostWritePullPlugin interface { - PostWritePull(WriteCtx) error + PostWritePull(WriteCtx) Xerror } PreWriteReplyPlugin interface { - PreWriteReply(WriteCtx) error + PreWriteReply(WriteCtx) Xerror } PostWriteReplyPlugin interface { - PostWriteReply(WriteCtx) error + PostWriteReply(WriteCtx) Xerror } PreWritePushPlugin interface { - PreWritePush(WriteCtx) error + PreWritePush(WriteCtx) Xerror } PostWritePushPlugin interface { - PostWritePush(WriteCtx) error + PostWritePush(WriteCtx) Xerror } PreReadHeaderPlugin interface { - PreReadHeader(ReadCtx) error + PreReadHeader(ReadCtx) Xerror } PostReadHeaderPlugin interface { - PostReadHeader(ReadCtx) error + PostReadHeader(ReadCtx) Xerror } PreReadBodyPlugin interface { - PreReadBody(ReadCtx) error + PreReadBody(ReadCtx) Xerror } PostReadBodyPlugin interface { - PostReadBody(ReadCtx) error + PostReadBody(ReadCtx) Xerror } // PluginContainer plugin container that defines base methods to manage plugins. @@ -171,145 +171,169 @@ func (p *pluginContainer) GetAll() []Plugin { return p.plugins } -func (p *pluginContainer) PostReg(h *Handler) error { - var errs []error +func (p *pluginContainer) PostReg(h *Handler) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PostRegPlugin); ok { - err := _plugin.PostReg(h) - if err != nil { - errs = append(errs, errors.Errorf("PostRegPlugin(%s): %s", plugin.Name(), err.Error())) + if xerr = _plugin.PostReg(h); xerr != nil { + Fatalf("%s-PostRegPlugin(%s)", plugin.Name(), xerr.Error()) + return xerr } } } - return errors.Merge(errs...) + return nil } -func (p *pluginContainer) PostDial(s ForeSession) error { +func (p *pluginContainer) PostDial(sess ForeSession) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PostDialPlugin); ok { - if err := _plugin.PostDial(s); err != nil { - return errors.Errorf("PostDialPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PostDial(sess); xerr != nil { + Debugf("dial fail (addr: %s, id: %s): %s-PostDialPlugin(%s)", sess.RemoteIp(), sess.Id(), plugin.Name(), xerr.Error()) + return xerr } } } return nil } -func (p *pluginContainer) PostAccept(s ForeSession) error { +func (p *pluginContainer) PostAccept(sess ForeSession) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PostAcceptPlugin); ok { - if err := _plugin.PostAccept(s); err != nil { - return errors.Errorf("PostAcceptPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PostAccept(sess); xerr != nil { + Debugf("accept session(addr: %s, id: %s): %s-PostAcceptPlugin(%s)", sess.RemoteIp(), sess.Id(), plugin.Name(), xerr.Error()) + return xerr } } } return nil } -func (p *pluginContainer) PreWritePull(ctx WriteCtx) error { +func (p *pluginContainer) PreWritePull(ctx WriteCtx) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PreWritePullPlugin); ok { - if err := _plugin.PreWritePull(ctx); err != nil { - return errors.Errorf("PreWritePullPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PreWritePull(ctx); xerr != nil { + Debugf("%s-PreWritePullPlugin(%s)", plugin.Name(), xerr.Error()) + return xerr } } } return nil } -func (p *pluginContainer) PostWritePull(ctx WriteCtx) error { +func (p *pluginContainer) PostWritePull(ctx WriteCtx) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PostWritePullPlugin); ok { - if err := _plugin.PostWritePull(ctx); err != nil { - return errors.Errorf("PostWritePullPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PostWritePull(ctx); xerr != nil { + Errorf("%s-PostWritePullPlugin(%s)", plugin.Name(), xerr.Error()) + return xerr } } } return nil } -func (p *pluginContainer) PreWriteReply(ctx WriteCtx) error { +func (p *pluginContainer) PreWriteReply(ctx WriteCtx) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PreWriteReplyPlugin); ok { - if err := _plugin.PreWriteReply(ctx); err != nil { - return errors.Errorf("PreWriteReplyPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PreWriteReply(ctx); xerr != nil { + Errorf("%s-PreWriteReplyPlugin(%s)", plugin.Name(), xerr.Error()) + return xerr } } } return nil } -func (p *pluginContainer) PostWriteReply(ctx WriteCtx) error { +func (p *pluginContainer) PostWriteReply(ctx WriteCtx) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PostWriteReplyPlugin); ok { - if err := _plugin.PostWriteReply(ctx); err != nil { - return errors.Errorf("PostWriteReplyPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PostWriteReply(ctx); xerr != nil { + Errorf("%s-PostWriteReplyPlugin(%s)", plugin.Name(), xerr.Error()) + return xerr } } } return nil } -func (p *pluginContainer) PreWritePush(ctx WriteCtx) error { +func (p *pluginContainer) PreWritePush(ctx WriteCtx) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PreWritePushPlugin); ok { - if err := _plugin.PreWritePush(ctx); err != nil { - return errors.Errorf("PreWritePushPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PreWritePush(ctx); xerr != nil { + Debugf("%s-PreWritePushPlugin(%s)", plugin.Name(), xerr.Error()) + return xerr } } } return nil } -func (p *pluginContainer) PostWritePush(ctx WriteCtx) error { +func (p *pluginContainer) PostWritePush(ctx WriteCtx) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PostWritePushPlugin); ok { - if err := _plugin.PostWritePush(ctx); err != nil { - return errors.Errorf("PostWritePushPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PostWritePush(ctx); xerr != nil { + Debugf("%s-PostWritePushPlugin(%s)", plugin.Name(), xerr.Error()) + return xerr } } } return nil } -func (p *pluginContainer) PreReadHeader(ctx ReadCtx) error { +func (p *pluginContainer) PreReadHeader(ctx ReadCtx) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PreReadHeaderPlugin); ok { - if err := _plugin.PreReadHeader(ctx); err != nil { - return errors.Errorf("PreReadHeaderPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PreReadHeader(ctx); xerr != nil { + Debugf("disconnected when reading: %s-PreReadHeaderPlugin(%s)", plugin.Name(), xerr.Error()) + return xerr } } } return nil } -func (p *pluginContainer) PostReadHeader(ctx ReadCtx) error { +func (p *pluginContainer) PostReadHeader(ctx ReadCtx) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PostReadHeaderPlugin); ok { - if err := _plugin.PostReadHeader(ctx); err != nil { - return errors.Errorf("PostReadHeaderPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PostReadHeader(ctx); xerr != nil { + Errorf("%s-PostReadHeaderPlugin(%s)", plugin.Name(), xerr.Error()) + return xerr } } } return nil } -func (p *pluginContainer) PreReadBody(ctx ReadCtx) error { +func (p *pluginContainer) PreReadBody(ctx ReadCtx) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PreReadBodyPlugin); ok { - if err := _plugin.PreReadBody(ctx); err != nil { - return errors.Errorf("PreReadBodyPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PreReadBody(ctx); xerr != nil { + Errorf("%s-PreReadBodyPlugin(%s)", plugin.Name(), xerr.Error()) + return xerr } } } return nil } -func (p *pluginContainer) PostReadBody(ctx ReadCtx) error { +func (p *pluginContainer) PostReadBody(ctx ReadCtx) Xerror { + var xerr Xerror for _, plugin := range p.plugins { if _plugin, ok := plugin.(PostReadBodyPlugin); ok { - if err := _plugin.PostReadBody(ctx); err != nil { - return errors.Errorf("PostReadBodyPlugin(%s): %s", plugin.Name(), err.Error()) + if xerr = _plugin.PostReadBody(ctx); xerr != nil { + Errorf("%s-PostReadBodyPlugin(%s)", plugin.Name(), xerr.Error()) + return xerr } } } diff --git a/router.go b/router.go index b0198a25..99223332 100644 --- a/router.go +++ b/router.go @@ -91,9 +91,7 @@ func (r *Router) Reg(ctrlStruct interface{}, plugin ...Plugin) { if _, ok := r.handlers[h.name]; ok { Fatalf("There is a %s handler conflict: %s", r.typ, h.name) } - if err = pluginContainer.PostReg(h); err != nil { - Fatalf("%v", err) - } + pluginContainer.PostReg(h) r.handlers[h.name] = h Printf("register %s handler: %s", r.typ, h.name) } diff --git a/session.go b/session.go index dbb6db52..1e061966 100644 --- a/session.go +++ b/session.go @@ -228,14 +228,12 @@ func (s *session) GoPull(uri string, args interface{}, reply interface{}, done c Errorf("panic:\n%v\n%s", p, goutil.PanicTrace(1)) } }() - + var err error s.pullCmdMap.Store(output.Header.Seq, cmd) - err := s.peer.pluginContainer.PreWritePull(cmd) + err = s.peer.pluginContainer.PreWritePull(cmd) if err == nil { if err = s.write(output); err == nil { - if err = s.peer.pluginContainer.PostWritePull(cmd); err != nil { - Errorf("%s", err.Error()) - } + s.peer.pluginContainer.PostWritePull(cmd) return } } @@ -342,7 +340,7 @@ func (s *session) startReadAndHandle() { err = s.peer.pluginContainer.PreReadHeader(ctx) if err != nil { s.peer.putContext(ctx, false) - s.readDisconnected(err) + s.readDisconnected(nil) return } @@ -493,7 +491,7 @@ func (s *session) readDisconnected(err error) { atomic.StoreInt32(&s.disconnected, 1) - if err != io.EOF && err != socket.ErrProactivelyCloseSocket { + if err != nil && err != io.EOF && err != socket.ErrProactivelyCloseSocket { Debugf("disconnected when reading: %s", err.Error()) } diff --git a/xerror.go b/xerror.go index c230315d..8ee3afd3 100644 --- a/xerror.go +++ b/xerror.go @@ -54,8 +54,16 @@ func (e *xerr) Text() string { func (e *xerr) Error() string { if len(e.json) == 0 { - b, _ := json.Marshal(e) - e.json = goutil.BytesToString(b) + json.Marshal(e) } return e.json } + +func (e *xerr) MarshalJSON() ([]byte, error) { + if len(e.json) == 0 { + b, _ := json.Marshal(*e) + e.json = goutil.BytesToString(b) + return b, nil + } + return goutil.StringToBytes(e.json), nil +}