Skip to content

Commit

Permalink
Upgrade plug-in, unified return tp.Xerror type
Browse files Browse the repository at this point in the history
  • Loading branch information
andeya committed Nov 4, 2017
1 parent db39a01 commit 7f806bb
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 137 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@ _testmain.go

samples/simple/client
samples/simple/server
samples/ab/frame_client_ab
samples/ab/server
socket/example/client
socket/example/server
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (p *AliasPlugin) Name() string {
}

// PostReadHeader converts the alias of this service.
func (p *AliasPlugin) PostReadHeader(ctx tp.ReadCtx) error {
func (p *AliasPlugin) PostReadHeader(ctx tp.ReadCtx) tp.Xerror {
var u = ctx.Input().Header.Uri
if p.Aliases != nil {
if a = p.Aliases[u]; a != "" {
Expand Down
2 changes: 1 addition & 1 deletion README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (p *AliasPlugin) Name() string {
}

// PostReadHeader converts the alias of this service.
func (p *AliasPlugin) PostReadHeader(ctx tp.ReadCtx) error {
func (p *AliasPlugin) PostReadHeader(ctx tp.ReadCtx) tp.Xerror {
var u = ctx.Input().Header.Uri
if p.Aliases != nil {
if a = p.Aliases[u]; a != "" {
Expand Down
90 changes: 31 additions & 59 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,11 @@ func (c *readHandleCtx) binding(header *socket.Header) (body interface{}) {
}

func (c *readHandleCtx) bindPush(header *socket.Header) interface{} {
err := c.pluginContainer.PostReadHeader(c)
var err error
err = c.pluginContainer.PostReadHeader(c)
if err != nil {
Errorf("%s", err.Error())
return nil
}

c.uri, err = url.Parse(header.Uri)
if err != nil {
return nil
Expand All @@ -256,7 +255,6 @@ func (c *readHandleCtx) bindPush(header *socket.Header) interface{} {

err = c.pluginContainer.PreReadBody(c)
if err != nil {
Errorf("%s", err.Error())
return nil
}

Expand All @@ -279,7 +277,6 @@ func (c *readHandleCtx) handlePush() {

err := c.pluginContainer.PostReadBody(c)
if err != nil {
Errorf("%s", err.Error())
return
}

Expand All @@ -297,15 +294,13 @@ func (c *readHandleCtx) bindPull(header *socket.Header) interface{} {
c.output.HeaderCodec = c.input.HeaderCodec
c.output.Header.Gzip = header.Gzip

err := c.pluginContainer.PostReadHeader(c)
if err != nil {
errStr := err.Error()
Errorf("%s", errStr)
c.output.Header.StatusCode = StatusFailedPlugin
c.output.Header.Status = errStr
xerr := c.pluginContainer.PostReadHeader(c)
if xerr != nil {
c.output.Header.StatusCode = xerr.Code()
c.output.Header.Status = xerr.Text()
return nil
}

var err error
c.uri, err = url.Parse(header.Uri)
if err != nil {
c.output.Header.StatusCode = StatusBadUri
Expand All @@ -329,12 +324,10 @@ func (c *readHandleCtx) bindPull(header *socket.Header) interface{} {
c.input.Body = c.arg.Interface()
}

err = c.pluginContainer.PreReadBody(c)
if err != nil {
errStr := err.Error()
Errorf("%s", errStr)
c.output.Header.StatusCode = StatusFailedPlugin
c.output.Header.Status = errStr
xerr = c.pluginContainer.PreReadBody(c)
if xerr != nil {
c.output.Header.StatusCode = xerr.Code()
c.output.Header.Status = xerr.Text()
return nil
}

Expand All @@ -352,13 +345,10 @@ func (c *readHandleCtx) handlePull() {
c.cost = time.Since(c.start)
c.sess.runlog(c.cost, c.input, c.output)
}()
err := c.pluginContainer.PostReadBody(c)
if err != nil {
errStr := err.Error()
Errorf("%s", errStr)
c.output.Header.Status = errStr
c.output.Header.StatusCode = StatusFailedPlugin
c.output = nil
xerr := c.pluginContainer.PostReadBody(c)
if xerr != nil {
c.output.Header.StatusCode = xerr.Code()
c.output.Header.Status = xerr.Text()
}

// handle pull
Expand All @@ -372,24 +362,14 @@ func (c *readHandleCtx) handlePull() {
}

// reply pull
if err = c.pluginContainer.PreWriteReply(c); err != nil {
errStr := err.Error()
c.output.Body = nil
if statusOK {
c.output.Header.StatusCode = StatusFailedPlugin
c.output.Header.Status = errStr
}
Errorf("%s", errStr)
}
c.pluginContainer.PreWriteReply(c)

if err = c.sess.write(c.output); err != nil {
if err := c.sess.write(c.output); err != nil {
c.output.Header.StatusCode = StatusWriteFailed
c.output.Header.Status = StatusText(StatusWriteFailed) + ": " + err.Error()
}

if err = c.pluginContainer.PostWriteReply(c); err != nil {
Errorf("%s", err.Error())
}
c.pluginContainer.PostWriteReply(c)
}

func (c *readHandleCtx) bindReply(header *socket.Header) interface{} {
Expand All @@ -401,14 +381,14 @@ func (c *readHandleCtx) bindReply(header *socket.Header) interface{} {
c.pullCmd = pullCmd.(*PullCmd)
c.public = c.pullCmd.public

err := c.pluginContainer.PostReadHeader(c)
if err != nil {
c.pullCmd.Xerror = NewXerror(StatusFailedPlugin, err.Error())
xerr := c.pluginContainer.PostReadHeader(c)
if xerr != nil {
c.pullCmd.Xerror = xerr
return nil
}
err = c.pluginContainer.PreReadBody(c)
if err != nil {
c.pullCmd.Xerror = NewXerror(StatusFailedPlugin, err.Error())
xerr = c.pluginContainer.PreReadBody(c)
if xerr != nil {
c.pullCmd.Xerror = xerr
return nil
}
return c.pullCmd.reply
Expand All @@ -433,8 +413,8 @@ func (c *readHandleCtx) handleReply() {
}
if c.input.Header.StatusCode != StatusOK {
c.pullCmd.Xerror = NewXerror(c.input.Header.StatusCode, c.input.Header.Status)
} else if err := c.pluginContainer.PostReadBody(c); err != nil {
c.pullCmd.Xerror = NewXerror(StatusFailedPlugin, err.Error())
} else if xerr := c.pluginContainer.PostReadBody(c); xerr != nil {
c.pullCmd.Xerror = xerr
}
}

Expand All @@ -446,12 +426,7 @@ func (c *readHandleCtx) handleUnsupported() {
c.cost = time.Since(c.start)
c.sess.runlog(c.cost, c.input, c.output)
}()
err := c.pluginContainer.PostReadBody(c)
if err != nil {
errStr := err.Error()
Errorf("%s", errStr)
}

c.pluginContainer.PostReadBody(c)
c.output.Header.StatusCode = StatusUnsupportedTx
c.output.Header.Status = StatusText(StatusUnsupportedTx)
c.output.Body = nil
Expand All @@ -460,18 +435,15 @@ func (c *readHandleCtx) handleUnsupported() {
c.output.BodyCodec = c.input.BodyCodec
}

if err = c.pluginContainer.PreWriteReply(c); err != nil {
Errorf("%s", err.Error())
}
c.pluginContainer.PreWriteReply(c)

if err = c.sess.write(c.output); err != nil {
err := c.sess.write(c.output)
if err != nil {
c.output.Header.StatusCode = StatusWriteFailed
c.output.Header.Status = StatusText(StatusWriteFailed) + ": " + err.Error()
}

if err = c.pluginContainer.PostWriteReply(c); err != nil {
Errorf("%s", err.Error())
}
c.pluginContainer.PostWriteReply(c)
}

// InputBodyBytes if the input body binder is []byte type, returns it, else returns nil.
Expand Down
12 changes: 8 additions & 4 deletions parameter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ func TypeText(typ int32) string {
}
}

// Response Header status codes as registered with IANA.
// Common response header status as registered with IANA.
// Note: Recommended custom status code is greater than 1000.
const (
StatusUndefined = 0

StatusWriteFailed = 100
StatusConnClosed = 101

Expand All @@ -52,7 +55,7 @@ const (
StatusConflict = 409
StatusUnsupportedTx = 410
StatusUnsupportedCodecType = 415
StatusFailedPlugin = 424
// StatusFailedPlugin = 424

StatusInternalServerError = 500
StatusNotImplemented = 501
Expand All @@ -67,7 +70,8 @@ const (
)

var statusText = map[int]string{
StatusWriteFailed: "write failed",
StatusUndefined: "Undefined Status",
StatusWriteFailed: "Write Failed",
StatusConnClosed: "Connection Closed",
StatusOK: "OK",
StatusBadUri: "Bad URI",
Expand All @@ -76,7 +80,7 @@ var statusText = map[int]string{
StatusConflict: "Conflict",
StatusUnsupportedTx: "Unsupported transaction type",
StatusUnsupportedCodecType: "Unsupported codec type",
StatusFailedPlugin: "Failed Plugin",
// StatusFailedPlugin: "Failed Plugin",

StatusInternalServerError: "Internal Server Error",
StatusNotImplemented: "Not Implemented",
Expand Down
7 changes: 2 additions & 5 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,12 @@ func (p *Peer) Dial(addr string, id ...string) (Session, error) {
}
var sess = newSession(p, conn, id...)
if err = p.pluginContainer.PostDial(sess); err != nil {
Printf("post dial fail (addr: %s, id: %s) ", addr, sess.Id())
sess.Close()
return nil, err
}
Go(sess.startReadAndHandle)
p.sessHub.Set(sess)
Printf("dial ok (addr: %s, id: %s)", addr, sess.Id())
Infof("dial ok (addr: %s, id: %s)", addr, sess.Id())
return sess, nil
}

Expand All @@ -129,13 +128,12 @@ func (p *Peer) DialContext(ctx context.Context, addr string, id ...string) (Sess
}
var sess = newSession(p, conn, id...)
if err = p.pluginContainer.PostDial(sess); err != nil {
Printf("post dial fail (addr: %s, id: %s) ", addr, sess.Id())
sess.Close()
return nil, err
}
Go(sess.startReadAndHandle)
p.sessHub.Set(sess)
Printf("dial ok (addr: %s, id: %s)", addr, sess.Id())
Infof("dial ok (addr: %s, id: %s)", addr, sess.Id())
return sess, nil
}

Expand Down Expand Up @@ -212,7 +210,6 @@ func (p *Peer) listen(addr string) error {
if !Go(func() {
if err := p.pluginContainer.PostAccept(sess); err != nil {
sess.Close()
Tracef("accept session(addr: %s, id: %s) error: %s", sess.RemoteIp(), sess.Id(), err.Error())
return
}
Tracef("accept session(addr: %s, id: %s) ok", sess.RemoteIp(), sess.Id())
Expand Down
Loading

0 comments on commit 7f806bb

Please sign in to comment.