Skip to content

Commit

Permalink
update example
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangkun committed Dec 3, 2021
1 parent b9343d4 commit 498a9a6
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 23 deletions.
30 changes: 23 additions & 7 deletions hookq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Consumer struct {
startOnce sync.Once
stopOnce sync.Once
stopChan chan struct{}
quitSignal chan struct{}
exitSignal chan struct{}
}

//NewConsumer new consumer
Expand Down Expand Up @@ -88,7 +88,7 @@ func NewConsumer(cnf *Cnf, mh MsgHandler, eh *ErrHandler) (*Consumer, error) {
submittedID: sid,
submittedTS: submittedTS,
stopChan: make(chan struct{}),
quitSignal: make(chan struct{}),
exitSignal: make(chan struct{}),
}
return c, nil
}
Expand All @@ -105,21 +105,37 @@ func (c *Consumer) Stop() {

//Wait : wait consumer quit
func (c *Consumer) Wait() {
<-c.quitSignal
<-c.exitSignal
}

//ExitSignal : exit consumer signal
func (c *Consumer) ExitSignal() <-chan struct{} {
return c.exitSignal
}

//start : start consumer in go routine
func (c *Consumer) start() {
go func() {
defer close(c.quitSignal)
defer close(c.exitSignal)
var timer *time.Timer
for {
c.process()
if timer == nil {
timer = time.NewTimer(c.tick)
} else {
if !timer.Stop() {
<-timer.C
}
timer.Reset(c.tick)
}
select {
case <-timer.C:
case <-c.stopChan:
c.submitWhenExit()
if !timer.Stop() {
<-timer.C
}
return
default:
c.process()
time.Sleep(c.tick)
}
}
}()
Expand Down
22 changes: 11 additions & 11 deletions hookq/example/bin/cnf.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
"charset": "utf8mb4",
"parseTime": "True",
"loc": "Local"
},
"logGorm": true,
"topicTbl": "topic.topic_example000",
"cursorTbl": "topic.topic_cursor_example000",
"id": "000",
"n": 500,
"tick": "10s",
"catchN": 1000,
"catchT": "600s",
"submitRetryNum": 10
}
}
},
"logGorm": true,
"topicTbl": "topic.topic_example000",
"cursorTbl": "topic.topic_cursor_example000",
"id": "000",
"n": 500,
"tick": "10s",
"catchN": 1000,
"catchT": "600s",
"submitRetryNum": 10
}
51 changes: 46 additions & 5 deletions hookq/example/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@ import (
"github.com/urfave/cli/v2"
"github.com/vmihailenco/msgpack/v5"
"log"
"os"
"os/signal"
"syscall"
)

type MsgHandler struct {
panicMode bool
}

func (m MsgHandler) Do(msg *hookq.Msg) error {
func NewMsgHandler(panicMode bool) hookq.MsgHandler {
return &MsgHandler{panicMode: panicMode}
}

func (m *MsgHandler) Do(msg *hookq.Msg) error {
log.Println("id:", msg.ID)
log.Println("protocol:", msg.P)
var a OrderAction
Expand All @@ -19,14 +27,17 @@ func (m MsgHandler) Do(msg *hookq.Msg) error {
return err
}
log.Printf("msg:{id:%d, state:%d}\n", a.ID, a.State)
if m.panicMode {
panic("self.panic")
}
return nil
}

type ErrorHandler struct {
category string
}

func NewErrorHandler(cat string) *ErrorHandler {
func NewErrorHandler(cat string) hookq.ErrDo {
return &ErrorHandler{category: cat}
}

Expand Down Expand Up @@ -65,6 +76,10 @@ var consumerCmd = &cli.Command{
Usage: "catch time duration",
Value: 0,
},
&cli.StringFlag{
Name: "m",
Usage: "mode -- n(tiny mode)/p(panic mode)/w(whole mode)",
},
},
Action: consumerMsg,
}
Expand All @@ -75,7 +90,15 @@ func consumerMsg(c *cli.Context) error {
return err
}
var consumer *hookq.Consumer
consumer, err = hookq.NewConsumer(cnf, MsgHandler{}, &hookq.ErrHandler{
var mode = c.String("m")
var msgHandler hookq.MsgHandler
if mode == "p" {
msgHandler = NewMsgHandler(true)
} else {
msgHandler = NewMsgHandler(false)
}

consumer, err = hookq.NewConsumer(cnf, msgHandler, &hookq.ErrHandler{
ReadErrHandler: NewErrorHandler("read"),
SubmitErrHandler: NewErrorHandler("submit"),
MsgErrHandler: NewErrorHandler("msg"),
Expand All @@ -84,8 +107,26 @@ func consumerMsg(c *cli.Context) error {
return err
}
consumer.Start()
consumer.Wait()
return nil

switch mode {
case "p", "w":
var signalChan = make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
select {
case <-signalChan:
consumer.Stop()
consumer.Wait()
log.Println("consumer.exit.by.signal")
return nil
case <-consumer.ExitSignal():
log.Println("consumer.exit.by.internal.error")
return nil
}
default:
consumer.Wait()
log.Println("consumer.default.quit")
return nil
}
}

func loadConsumerCnf(c *cli.Context) (*hookq.Cnf, error) {
Expand Down

0 comments on commit 498a9a6

Please sign in to comment.