diff --git a/internal/pkg/service/appevent/appevent.go b/internal/pkg/service/appevent/appevent.go index 3bf87220c..3ef55fb4c 100644 --- a/internal/pkg/service/appevent/appevent.go +++ b/internal/pkg/service/appevent/appevent.go @@ -3,6 +3,9 @@ package appevent import ( "context" "encoding/json" + "github.com/douyu/jupiter/pkg/store/gorm" + "github.com/douyu/jupiter/pkg/xlog" + "go.uber.org/zap" "strings" "time" @@ -27,7 +30,7 @@ type appEvent struct { topic string } -func InitAppEvent(eventProducer *rocketmq.Producer, topic string) *appEvent { +func InitAppEvent(eventProducer *rocketmq.Producer, topic string) { obj := &appEvent{ eventChan: make(chan db.AppEvent, 10000), eventProducer: eventProducer, @@ -35,7 +38,6 @@ func InitAppEvent(eventProducer *rocketmq.Producer, topic string) *appEvent { } go obj.ConsumeEvent() AppEvent = obj - return obj } func (a *appEvent) PutEvent(event db.AppEvent) { @@ -57,33 +59,27 @@ func (a *appEvent) ConsumeEvent() { } func (a *appEvent) insert(event db.AppEvent) error { - tx := invoker.JunoMysql.Begin() - if err := tx.Create(&event).Error; err != nil { - tx.Rollback() - return err - } - - if cfg.Cfg.JunoEvent.Rocketmq.Enable { - event.HandleOperationName() - event.HandleSourceName() - msg := &eventMessage{AppEvent: event, HostName: strings.Split(event.HostName, ",")} - eventMsg, _ := json.Marshal(&msg) - ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) - _, err := a.eventProducer.SendSync(ctx, primitive.NewMessage(a.topic, eventMsg)) - cancelFn() - if err != nil { - tx.Rollback() + err := invoker.JunoMysql.Transaction(func(tx *gorm.DB) error { + if err := tx.Create(&event).Error; err != nil { + xlog.Error("app event insert err", zap.Error(err)) return err } - } - - err := tx.Commit().Error - if err != nil { - return err - } - - //invoker.AppStatic.WithLabelValues(event.App, event.Source, event.Operation).Inc() - return nil + if cfg.Cfg.JunoEvent.Rocketmq.Enable { + event.HandleOperationName() + event.HandleSourceName() + msg := &eventMessage{AppEvent: event, HostName: strings.Split(event.HostName, ",")} + eventMsg, _ := json.Marshal(&msg) + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) + _, err := a.eventProducer.SendSync(ctx, primitive.NewMessage(a.topic, eventMsg)) + cancelFn() + if err != nil { + xlog.Error("app event eventProducer err", zap.Error(err)) + return err + } + } + return nil + }) + return err } func (a *appEvent) List(param view.ReqEventList) (res []db.AppEvent, page *view.Pagination, err error) {