diff --git a/hookq/consumer.go b/hookq/consumer.go index af77041..452fcb2 100644 --- a/hookq/consumer.go +++ b/hookq/consumer.go @@ -51,7 +51,7 @@ type Consumer struct { startOnce sync.Once stopOnce sync.Once stopChan chan struct{} - quitSignal chan struct{} + exitSignal chan struct{} } //NewConsumer new consumer @@ -88,7 +88,7 @@ func NewConsumer(cnf *Cnf, mh MsgHandler, eh *ErrHandler) (*Consumer, error) { submittedID: sid, submittedTS: submittedTS, stopChan: make(chan struct{}), - quitSignal: make(chan struct{}), + exitSignal: make(chan struct{}), } return c, nil } @@ -105,21 +105,37 @@ func (c *Consumer) Stop() { //Wait : wait consumer quit func (c *Consumer) Wait() { - <-c.quitSignal + <-c.exitSignal +} + +//ExitSignal : exit consumer signal +func (c *Consumer) ExitSignal() <-chan struct{} { + return c.exitSignal } //start : start consumer in go routine func (c *Consumer) start() { go func() { - defer close(c.quitSignal) + defer close(c.exitSignal) + var timer *time.Timer for { + c.process() + if timer == nil { + timer = time.NewTimer(c.tick) + } else { + if !timer.Stop() { + <-timer.C + } + timer.Reset(c.tick) + } select { + case <-timer.C: case <-c.stopChan: c.submitWhenExit() + if !timer.Stop() { + <-timer.C + } return - default: - c.process() - time.Sleep(c.tick) } } }() diff --git a/hookq/example/bin/cnf.json b/hookq/example/bin/cnf.json index d589465..33a5d55 100644 --- a/hookq/example/bin/cnf.json +++ b/hookq/example/bin/cnf.json @@ -9,15 +9,15 @@ "charset": "utf8mb4", "parseTime": "True", "loc": "Local" - }, - "logGorm": true, - "topicTbl": "topic.topic_example000", - "cursorTbl": "topic.topic_cursor_example000", - "id": "000", - "n": 500, - "tick": "10s", - "catchN": 1000, - "catchT": "600s", - "submitRetryNum": 10 - } + } + }, + "logGorm": true, + "topicTbl": "topic.topic_example000", + "cursorTbl": "topic.topic_cursor_example000", + "id": "000", + "n": 500, + "tick": "10s", + "catchN": 1000, + "catchT": "600s", + "submitRetryNum": 10 } diff --git a/hookq/example/consumer.go b/hookq/example/consumer.go index 26dd863..a37c153 100644 --- a/hookq/example/consumer.go +++ b/hookq/example/consumer.go @@ -5,12 +5,20 @@ import ( "github.com/urfave/cli/v2" "github.com/vmihailenco/msgpack/v5" "log" + "os" + "os/signal" + "syscall" ) type MsgHandler struct { + panicMode bool } -func (m MsgHandler) Do(msg *hookq.Msg) error { +func NewMsgHandler(panicMode bool) hookq.MsgHandler { + return &MsgHandler{panicMode: panicMode} +} + +func (m *MsgHandler) Do(msg *hookq.Msg) error { log.Println("id:", msg.ID) log.Println("protocol:", msg.P) var a OrderAction @@ -19,6 +27,9 @@ func (m MsgHandler) Do(msg *hookq.Msg) error { return err } log.Printf("msg:{id:%d, state:%d}\n", a.ID, a.State) + if m.panicMode { + panic("self.panic") + } return nil } @@ -26,7 +37,7 @@ type ErrorHandler struct { category string } -func NewErrorHandler(cat string) *ErrorHandler { +func NewErrorHandler(cat string) hookq.ErrDo { return &ErrorHandler{category: cat} } @@ -65,6 +76,10 @@ var consumerCmd = &cli.Command{ Usage: "catch time duration", Value: 0, }, + &cli.StringFlag{ + Name: "m", + Usage: "mode -- n(tiny mode)/p(panic mode)/w(whole mode)", + }, }, Action: consumerMsg, } @@ -75,7 +90,15 @@ func consumerMsg(c *cli.Context) error { return err } var consumer *hookq.Consumer - consumer, err = hookq.NewConsumer(cnf, MsgHandler{}, &hookq.ErrHandler{ + var mode = c.String("m") + var msgHandler hookq.MsgHandler + if mode == "p" { + msgHandler = NewMsgHandler(true) + } else { + msgHandler = NewMsgHandler(false) + } + + consumer, err = hookq.NewConsumer(cnf, msgHandler, &hookq.ErrHandler{ ReadErrHandler: NewErrorHandler("read"), SubmitErrHandler: NewErrorHandler("submit"), MsgErrHandler: NewErrorHandler("msg"), @@ -84,8 +107,26 @@ func consumerMsg(c *cli.Context) error { return err } consumer.Start() - consumer.Wait() - return nil + + switch mode { + case "p", "w": + var signalChan = make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + select { + case <-signalChan: + consumer.Stop() + consumer.Wait() + log.Println("consumer.exit.by.signal") + return nil + case <-consumer.ExitSignal(): + log.Println("consumer.exit.by.internal.error") + return nil + } + default: + consumer.Wait() + log.Println("consumer.default.quit") + return nil + } } func loadConsumerCnf(c *cli.Context) (*hookq.Cnf, error) {