From ad6423d005fb3f9f8a5dc0cacb5caaf4c0fad5da Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 13 Apr 2023 11:46:50 -0500 Subject: [PATCH 1/2] Use envelope struct for marshalling courier messages --- core/models/msgs.go | 139 ++++++++++++------------------ core/models/msgs_test.go | 172 +------------------------------------ core/msgio/courier.go | 106 +++++++++++++++++++---- core/msgio/courier_test.go | 38 +++----- core/msgio/send.go | 31 +++++-- 5 files changed, 184 insertions(+), 302 deletions(-) diff --git a/core/models/msgs.go b/core/models/msgs.go index ff5730241..f70d3a375 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -89,59 +89,61 @@ var unsendableToFailedReason = map[flows.UnsendableReason]MsgFailedReason{ // Msg is our type for mailroom messages type Msg struct { m struct { - ID flows.MsgID `db:"id" json:"id"` - BroadcastID BroadcastID `db:"broadcast_id" json:"broadcast_id,omitempty"` - UUID flows.MsgUUID `db:"uuid" json:"uuid"` - Text string `db:"text" json:"text"` - Attachments pq.StringArray `db:"attachments" json:"attachments,omitempty"` - QuickReplies pq.StringArray `db:"quick_replies" json:"quick_replies,omitempty"` - Locale envs.Locale `db:"locale" json:"locale,omitempty"` - HighPriority bool `db:"high_priority" json:"high_priority"` - CreatedOn time.Time `db:"created_on" json:"created_on"` - ModifiedOn time.Time `db:"modified_on" json:"modified_on"` - SentOn *time.Time `db:"sent_on" json:"sent_on"` - QueuedOn time.Time `db:"queued_on" json:"queued_on"` - Direction MsgDirection `db:"direction" json:"direction"` - Status MsgStatus `db:"status" json:"status"` - Visibility MsgVisibility `db:"visibility" json:"-"` - MsgType MsgType `db:"msg_type" json:"-"` - MsgCount int `db:"msg_count" json:"tps_cost"` - ErrorCount int `db:"error_count" json:"error_count"` - NextAttempt *time.Time `db:"next_attempt" json:"next_attempt"` - FailedReason MsgFailedReason `db:"failed_reason" json:"-"` - ExternalID null.String `db:"external_id" json:"-"` - ResponseToExternalID null.String ` json:"response_to_external_id,omitempty"` - Metadata null.Map `db:"metadata" json:"metadata,omitempty"` - ChannelID ChannelID `db:"channel_id" json:"channel_id"` - ChannelUUID assets.ChannelUUID ` json:"channel_uuid"` - ContactID ContactID `db:"contact_id" json:"contact_id"` - ContactURNID *URNID `db:"contact_urn_id" json:"contact_urn_id"` - IsResend bool ` json:"is_resend,omitempty"` - URN urns.URN `db:"urn_urn" json:"urn"` - URNAuth null.String `db:"urn_auth" json:"urn_auth,omitempty"` - OrgID OrgID `db:"org_id" json:"org_id"` - FlowID FlowID `db:"flow_id" json:"-"` - CreatedByID UserID `db:"created_by_id" json:"-"` - - // extra data from handling added to the courier payload - SessionID SessionID `json:"session_id,omitempty"` - SessionStatus SessionStatus `json:"session_status,omitempty"` - Flow *assets.FlowReference `json:"flow,omitempty"` - - // These fields are set on the last outgoing message in a session's sprint. In the case - // of the session being at a wait with a timeout then the timeout will be set. It is up to - // Courier to update the session's timeout appropriately after sending the message. - SessionWaitStartedOn *time.Time `json:"session_wait_started_on,omitempty"` - SessionTimeout int `json:"session_timeout,omitempty"` + ID flows.MsgID `db:"id"` + UUID flows.MsgUUID `db:"uuid"` + OrgID OrgID `db:"org_id"` + + // origin + BroadcastID BroadcastID `db:"broadcast_id"` + FlowID FlowID `db:"flow_id"` + CreatedByID UserID `db:"created_by_id"` + + // content + Text string `db:"text"` + Attachments pq.StringArray `db:"attachments"` + QuickReplies pq.StringArray `db:"quick_replies"` + Locale envs.Locale `db:"locale"` + + HighPriority bool `db:"high_priority"` + Direction MsgDirection `db:"direction"` + Status MsgStatus `db:"status"` + Visibility MsgVisibility `db:"visibility"` + MsgType MsgType `db:"msg_type"` + MsgCount int `db:"msg_count"` + CreatedOn time.Time `db:"created_on"` + ModifiedOn time.Time `db:"modified_on"` + ExternalID null.String `db:"external_id"` + Metadata null.Map `db:"metadata"` + ChannelID ChannelID `db:"channel_id"` + ContactID ContactID `db:"contact_id"` + ContactURNID *URNID `db:"contact_urn_id"` + URN urns.URN `db:"urn_urn"` + URNAuth null.String `db:"urn_auth"` + + SentOn *time.Time `db:"sent_on"` + QueuedOn time.Time `db:"queued_on"` + ErrorCount int `db:"error_count"` + NextAttempt *time.Time `db:"next_attempt"` + FailedReason MsgFailedReason `db:"failed_reason"` } - channel *Channel + // extra data added to the courier payload + Flow *assets.FlowReference `json:"flow,omitempty"` + ResponseToExternalID null.String `json:"response_to_external_id,omitempty"` + IsResend bool `json:"is_resend,omitempty"` + SessionID SessionID `json:"session_id,omitempty"` + SessionStatus SessionStatus `json:"session_status,omitempty"` + + // These fields are set on the last outgoing message in a session's sprint. In the case + // of the session being at a wait with a timeout then the timeout will be set. It is up to + // Courier to update the session's timeout appropriately after sending the message. + SessionWaitStartedOn *time.Time `json:"session_wait_started_on,omitempty"` + SessionTimeout int `json:"session_timeout,omitempty"` } func (m *Msg) ID() flows.MsgID { return m.m.ID } func (m *Msg) BroadcastID() BroadcastID { return m.m.BroadcastID } func (m *Msg) UUID() flows.MsgUUID { return m.m.UUID } -func (m *Msg) Channel() *Channel { return m.channel } func (m *Msg) Text() string { return m.m.Text } func (m *Msg) QuickReplies() []string { return m.m.QuickReplies } func (m *Msg) Locale() envs.Locale { return m.m.Locale } @@ -161,23 +163,18 @@ func (m *Msg) ExternalID() null.String { return m.m.ExternalID } func (m *Msg) Metadata() map[string]interface{} { return m.m.Metadata } func (m *Msg) MsgCount() int { return m.m.MsgCount } func (m *Msg) ChannelID() ChannelID { return m.m.ChannelID } -func (m *Msg) ChannelUUID() assets.ChannelUUID { return m.m.ChannelUUID } func (m *Msg) URN() urns.URN { return m.m.URN } func (m *Msg) URNAuth() null.String { return m.m.URNAuth } func (m *Msg) OrgID() OrgID { return m.m.OrgID } func (m *Msg) FlowID() FlowID { return m.m.FlowID } func (m *Msg) ContactID() ContactID { return m.m.ContactID } func (m *Msg) ContactURNID() *URNID { return m.m.ContactURNID } -func (m *Msg) IsResend() bool { return m.m.IsResend } func (m *Msg) SetChannel(channel *Channel) { - m.channel = channel if channel != nil { m.m.ChannelID = channel.ID() - m.m.ChannelUUID = channel.UUID() } else { m.m.ChannelID = NilChannelID - m.m.ChannelUUID = "" } } @@ -345,13 +342,13 @@ func newOutgoingTextMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact // if we have a session, set fields on the message from that if session != nil { - m.ResponseToExternalID = session.IncomingMsgExternalID() - m.SessionID = session.ID() - m.SessionStatus = session.Status() + msg.ResponseToExternalID = session.IncomingMsgExternalID() + msg.SessionID = session.ID() + msg.SessionStatus = session.Status() if flow != nil { m.FlowID = flow.ID() - m.Flow = flow.Reference() + msg.Flow = flow.Reference() } // if we're responding to an incoming message, send as high priority @@ -523,8 +520,6 @@ func loadMessages(ctx context.Context, db Queryer, sql string, params ...interfa defer rows.Close() msgs := make([]*Msg, 0) - channelIDsSeen := make(map[ChannelID]bool) - channelIDs := make([]ChannelID, 0, 5) for rows.Next() { msg := &Msg{} @@ -534,25 +529,6 @@ func loadMessages(ctx context.Context, db Queryer, sql string, params ...interfa } msgs = append(msgs, msg) - - if msg.ChannelID() != NilChannelID && !channelIDsSeen[msg.ChannelID()] { - channelIDsSeen[msg.ChannelID()] = true - channelIDs = append(channelIDs, msg.ChannelID()) - } - } - - channels, err := GetChannelsByID(ctx, db, channelIDs) - if err != nil { - return nil, errors.Wrap(err, "error fetching channels for messages") - } - - channelsByID := make(map[ChannelID]*Channel) - for _, ch := range channels { - channelsByID[ch.ID()] = ch - } - - for _, msg := range msgs { - msg.SetChannel(channelsByID[msg.m.ChannelID]) } return msgs, nil @@ -579,8 +555,8 @@ func NormalizeAttachment(cfg *runtime.Config, attachment utils.Attachment) utils // SetTimeout sets the timeout for this message func (m *Msg) SetTimeout(start time.Time, timeout time.Duration) { - m.m.SessionWaitStartedOn = &start - m.m.SessionTimeout = int(timeout / time.Second) + m.SessionWaitStartedOn = &start + m.SessionTimeout = int(timeout / time.Second) } // InsertMessages inserts the passed in messages in a single query @@ -695,24 +671,19 @@ func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAsse if ch != nil { channel := oa.ChannelByUUID(ch.UUID()) - msg.channel = channel - msg.m.ChannelID = channel.ID() - msg.m.ChannelUUID = channel.UUID() msg.m.Status = MsgStatusPending msg.m.QueuedOn = dates.Now() msg.m.SentOn = nil msg.m.ErrorCount = 0 msg.m.FailedReason = "" - msg.m.IsResend = true // mark message as being a resend so it will be queued to courier as such + msg.IsResend = true // mark message as being a resend so it will be queued to courier as such resends = append(resends, msg.m) resent = append(resent, msg) } else { // if we don't have channel or a URN, fail again - msg.channel = nil msg.m.ChannelID = NilChannelID - msg.m.ChannelUUID = assets.ChannelUUID("") msg.m.Status = MsgStatusFailed msg.m.QueuedOn = dates.Now() msg.m.SentOn = nil diff --git a/core/models/msgs_test.go b/core/models/msgs_test.go index ad1e96555..10da776e4 100644 --- a/core/models/msgs_test.go +++ b/core/models/msgs_test.go @@ -2,19 +2,16 @@ package models_test import ( "context" - "encoding/json" "fmt" "testing" "time" "github.com/nyaruka/gocommon/dates" "github.com/nyaruka/gocommon/dbutil/assertdb" - "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/envs" "github.com/nyaruka/goflow/flows" - "github.com/nyaruka/goflow/test" "github.com/nyaruka/goflow/utils" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/runtime" @@ -241,169 +238,6 @@ func TestNewOutgoingFlowMsg(t *testing.T) { } } -func TestMarshalMsg(t *testing.T) { - ctx, rt := testsuite.Runtime() - - defer testsuite.Reset(testsuite.ResetData) - - assertdb.Query(t, rt.DB, `SELECT count(*) FROM orgs_org WHERE is_suspended = TRUE`).Returns(0) - - oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID) - require.NoError(t, err) - require.False(t, oa.Org().Suspended()) - - channel := oa.ChannelByUUID(testdata.TwilioChannel.UUID) - flow, _ := oa.FlowByID(testdata.Favorites.ID) - urn := urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", testdata.Cathy.URNID)) - flowMsg1 := flows.NewMsgOut( - urn, - assets.NewChannelReference(testdata.TwilioChannel.UUID, "Test Channel"), - "Hi there", - []utils.Attachment{utils.Attachment("image/jpeg:https://dl-foo.com/image.jpg")}, - []string{"yes", "no"}, - flows.NewMsgTemplating(assets.NewTemplateReference("4474d39c-ac2c-486d-bceb-8a774a515299", "tpl"), []string{"name"}, "tpls"), - flows.MsgTopicPurchase, - envs.Locale(`eng-US`), - flows.NilUnsendableReason, - ) - - // create a non-priority flow message.. i.e. the session isn't responding to an incoming message - session := insertTestSession(t, ctx, rt, testdata.Org1, testdata.Cathy, testdata.Favorites) - msg1, err := models.NewOutgoingFlowMsg(rt, oa.Org(), channel, session, flow, flowMsg1, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC)) - require.NoError(t, err) - - cathy := session.Contact() - - err = models.InsertMessages(ctx, rt.DB, []*models.Msg{msg1}) - require.NoError(t, err) - - marshaled, err := json.Marshal(msg1) - assert.NoError(t, err) - - test.AssertEqualJSON(t, []byte(fmt.Sprintf(`{ - "attachments": [ - "image/jpeg:https://dl-foo.com/image.jpg" - ], - "channel_id": 10000, - "channel_uuid": "74729f45-7f29-4868-9dc4-90e491e3c7d8", - "contact_id": 10000, - "contact_urn_id": 10000, - "created_on": "2021-11-09T14:03:30Z", - "direction": "O", - "error_count": 0, - "flow": {"uuid": "9de3663f-c5c5-4c92-9f45-ecbc09abcc85", "name": "Favorites"}, - "high_priority": false, - "id": %d, - "locale": "eng-US", - "metadata": { - "templating": { - "namespace": "tpls", - "template": {"name": "tpl", "uuid": "4474d39c-ac2c-486d-bceb-8a774a515299"}, - "variables": ["name"] - }, - "topic": "purchase" - }, - "modified_on": %s, - "next_attempt": null, - "org_id": 1, - "queued_on": %s, - "quick_replies": [ - "yes", - "no" - ], - "sent_on": null, - "session_id": %d, - "session_status": "W", - "status": "Q", - "text": "Hi there", - "tps_cost": 2, - "urn": "tel:+250700000001?id=10000", - "uuid": "%s" - }`, msg1.ID(), jsonx.MustMarshal(msg1.ModifiedOn()), jsonx.MustMarshal(msg1.QueuedOn()), session.ID(), msg1.UUID())), marshaled) - - // create a priority flow message.. i.e. the session is responding to an incoming message - flowMsg2 := flows.NewMsgOut( - urn, - assets.NewChannelReference(testdata.TwilioChannel.UUID, "Test Channel"), - "Hi there", - nil, nil, nil, - flows.NilMsgTopic, - envs.NilLocale, - flows.NilUnsendableReason, - ) - in1 := testdata.InsertIncomingMsg(rt, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "test", models.MsgStatusHandled) - session.SetIncomingMsg(models.MsgID(in1.ID()), null.String("EX123")) - msg2, err := models.NewOutgoingFlowMsg(rt, oa.Org(), channel, session, flow, flowMsg2, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC)) - require.NoError(t, err) - - err = models.InsertMessages(ctx, rt.DB, []*models.Msg{msg2}) - require.NoError(t, err) - - marshaled, err = json.Marshal(msg2) - assert.NoError(t, err) - - test.AssertEqualJSON(t, []byte(fmt.Sprintf(`{ - "channel_id": 10000, - "channel_uuid": "74729f45-7f29-4868-9dc4-90e491e3c7d8", - "contact_id": 10000, - "contact_urn_id": 10000, - "created_on": "2021-11-09T14:03:30Z", - "direction": "O", - "error_count": 0, - "flow": {"uuid": "9de3663f-c5c5-4c92-9f45-ecbc09abcc85", "name": "Favorites"}, - "response_to_external_id": "EX123", - "high_priority": true, - "id": %d, - "modified_on": %s, - "next_attempt": null, - "org_id": 1, - "queued_on": %s, - "sent_on": null, - "session_id": %d, - "session_status": "W", - "status": "Q", - "text": "Hi there", - "tps_cost": 1, - "urn": "tel:+250700000001?id=10000", - "uuid": "%s" - }`, msg2.ID(), jsonx.MustMarshal(msg2.ModifiedOn()), jsonx.MustMarshal(msg2.QueuedOn()), session.ID(), msg2.UUID())), marshaled) - - // try a broadcast message which won't have session and flow fields set - bcastID := testdata.InsertBroadcast(rt, testdata.Org1, `eng`, map[envs.Language]string{`eng`: "Blast"}, models.NilScheduleID, []*testdata.Contact{testdata.Cathy}, nil) - bcastMsg1 := flows.NewMsgOut(urn, assets.NewChannelReference(testdata.TwilioChannel.UUID, "Test Channel"), "Blast", nil, nil, nil, flows.NilMsgTopic, envs.NilLocale, flows.NilUnsendableReason) - msg3, err := models.NewOutgoingBroadcastMsg(rt, oa.Org(), channel, cathy, bcastMsg1, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC), &models.BroadcastBatch{BroadcastID: bcastID, CreatedByID: testdata.Admin.ID}) - require.NoError(t, err) - - err = models.InsertMessages(ctx, rt.DB, []*models.Msg{msg2}) - require.NoError(t, err) - - marshaled, err = json.Marshal(msg3) - assert.NoError(t, err) - - test.AssertEqualJSON(t, []byte(fmt.Sprintf(`{ - "broadcast_id": %d, - "channel_id": 10000, - "channel_uuid": "74729f45-7f29-4868-9dc4-90e491e3c7d8", - "contact_id": 10000, - "contact_urn_id": 10000, - "created_on": "2021-11-09T14:03:30Z", - "direction": "O", - "error_count": 0, - "high_priority": false, - "id": %d, - "modified_on": %s, - "next_attempt": null, - "org_id": 1, - "queued_on": %s, - "sent_on": null, - "status": "Q", - "text": "Blast", - "tps_cost": 1, - "urn": "tel:+250700000001?id=10000", - "uuid": "%s" - }`, bcastID, msg3.ID(), jsonx.MustMarshal(msg3.ModifiedOn()), jsonx.MustMarshal(msg3.QueuedOn()), msg3.UUID())), marshaled) -} - func TestGetMessagesByID(t *testing.T) { ctx, rt := testsuite.Runtime() @@ -476,11 +310,11 @@ func TestResendMessages(t *testing.T) { assert.Len(t, resent, 3) // only #1, #2 and #3 can be resent // both messages should now have a channel and be marked for resending - assert.True(t, resent[0].IsResend()) + assert.True(t, resent[0].IsResend) assert.Equal(t, testdata.TwilioChannel.ID, resent[0].ChannelID()) - assert.True(t, resent[1].IsResend()) + assert.True(t, resent[1].IsResend) assert.Equal(t, testdata.VonageChannel.ID, resent[1].ChannelID()) // channel changed - assert.True(t, resent[2].IsResend()) + assert.True(t, resent[2].IsResend) assert.Equal(t, testdata.TwilioChannel.ID, resent[2].ChannelID()) // channel added assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'Q' AND queued_on > $1 AND sent_on IS NULL`, now).Returns(3) diff --git a/core/msgio/courier.go b/core/msgio/courier.go index 789c8ed43..16e25f92c 100644 --- a/core/msgio/courier.go +++ b/core/msgio/courier.go @@ -15,9 +15,12 @@ import ( "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/assets" + "github.com/nyaruka/goflow/envs" + "github.com/nyaruka/goflow/flows" "github.com/nyaruka/goflow/utils" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/runtime" + "github.com/nyaruka/null/v2" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -31,6 +34,77 @@ const ( highPriority = 1 ) +type Msg struct { + ID flows.MsgID `json:"id"` + UUID flows.MsgUUID `json:"uuid"` + OrgID models.OrgID `json:"org_id"` + Text string `json:"text"` + Attachments []utils.Attachment `json:"attachments,omitempty"` + QuickReplies []string `json:"quick_replies,omitempty"` + Locale envs.Locale `json:"locale,omitempty"` + HighPriority bool `json:"high_priority"` + Direction models.MsgDirection `json:"direction"` + Status models.MsgStatus `json:"status"` + MsgCount int `json:"tps_cost"` + CreatedOn time.Time `json:"created_on"` + ModifiedOn time.Time `json:"modified_on"` + ChannelID models.ChannelID `json:"channel_id"` + ChannelUUID assets.ChannelUUID `json:"channel_uuid"` + ContactID models.ContactID `json:"contact_id"` + ContactURNID *models.URNID `json:"contact_urn_id"` + URN urns.URN `json:"urn"` + URNAuth null.String `json:"urn_auth,omitempty"` + SentOn *time.Time `json:"sent_on"` + QueuedOn time.Time `json:"queued_on"` + ErrorCount int `json:"error_count"` + NextAttempt *time.Time `json:"next_attempt"` + Metadata null.Map `json:"metadata,omitempty"` + Flow *assets.FlowReference `json:"flow,omitempty"` + ResponseToExternalID string `json:"response_to_external_id,omitempty"` + IsResend bool `json:"is_resend,omitempty"` + + SessionID models.SessionID `json:"session_id,omitempty"` + SessionStatus models.SessionStatus `json:"session_status,omitempty"` + SessionWaitStartedOn *time.Time `json:"session_wait_started_on,omitempty"` + SessionTimeout int `json:"session_timeout,omitempty"` +} + +func NewMsg(m *models.Msg, ch *models.Channel) *Msg { + return &Msg{ + ID: m.ID(), + UUID: m.UUID(), + OrgID: m.OrgID(), + Text: m.Text(), + Attachments: m.Attachments(), + QuickReplies: m.QuickReplies(), + Locale: m.Locale(), + HighPriority: m.HighPriority(), + Direction: m.Direction(), + Status: m.Status(), + MsgCount: m.MsgCount(), + CreatedOn: m.CreatedOn(), + ModifiedOn: m.ModifiedOn(), + ChannelID: m.ChannelID(), + ChannelUUID: ch.UUID(), + ContactID: m.ContactID(), + ContactURNID: m.ContactURNID(), + URN: m.URN(), + URNAuth: m.URNAuth(), + SentOn: m.SentOn(), + QueuedOn: m.QueuedOn(), + ErrorCount: m.ErrorCount(), + NextAttempt: m.NextAttempt(), + Metadata: m.Metadata(), + Flow: m.Flow, + ResponseToExternalID: string(m.ResponseToExternalID), + IsResend: m.IsResend, + SessionID: m.SessionID, + SessionStatus: m.SessionStatus, + SessionWaitStartedOn: m.SessionWaitStartedOn, + SessionTimeout: m.SessionTimeout, + } +} + var queuePushScript = redis.NewScript(6, ` -- KEYS: [QueueType, QueueName, TPS, Priority, Items, EpochSecs] local queueType, queueName, tps, priority, items, epochSecs = KEYS[1], KEYS[2], tonumber(KEYS[3]), KEYS[4], KEYS[5], KEYS[6] @@ -61,11 +135,17 @@ end `) // PushCourierBatch pushes a batch of messages for a single contact and channel onto the appropriate courier queue -func PushCourierBatch(rc redis.Conn, ch *models.Channel, batch []*models.Msg, timestamp string) error { +func PushCourierBatch(rc redis.Conn, ch *models.Channel, msgs []*models.Msg, timestamp string) error { priority := bulkPriority - if batch[0].HighPriority() { + if msgs[0].HighPriority() { priority = highPriority } + + batch := make([]*Msg, len(msgs)) + for i, m := range msgs { + batch[i] = NewMsg(m, ch) + } + batchJSON := jsonx.MustMarshal(batch) _, err := queuePushScript.Do(rc, "msgs", ch.UUID(), ch.TPS(), priority, batchJSON, timestamp) @@ -73,7 +153,7 @@ func PushCourierBatch(rc redis.Conn, ch *models.Channel, batch []*models.Msg, ti } // QueueCourierMessages queues messages for a single contact to Courier -func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*models.Msg) error { +func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, channel *models.Channel, msgs []*models.Msg) error { if len(msgs) == 0 { return nil } @@ -83,44 +163,36 @@ func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*mod now := dates.Now() epochSeconds := strconv.FormatFloat(float64(now.UnixNano()/int64(time.Microsecond))/float64(1000000), 'f', 6, 64) - // we batch msgs by channel uuid + // we batch msgs by priority batch := make([]*models.Msg, 0, len(msgs)) - currentChannel := msgs[0].Channel() + currentPriority := msgs[0].HighPriority() // commits our batch to redis commitBatch := func() error { if len(batch) > 0 { start := time.Now() - err := PushCourierBatch(rc, currentChannel, batch, epochSeconds) + err := PushCourierBatch(rc, channel, batch, epochSeconds) if err != nil { return err } - logrus.WithFields(logrus.Fields{ - "msgs": len(batch), - "contact_id": contactID, - "channel_uuid": currentChannel.UUID(), - "elapsed": time.Since(start), - }).Info("msgs queued to courier") + logrus.WithFields(logrus.Fields{"msgs": len(batch), "contact_id": contactID, "channel_uuid": channel.UUID(), "elapsed": time.Since(start)}).Info("msgs queued to courier") } return nil } for _, msg := range msgs { // sanity check the state of the msg we're about to queue... - assert(msg.Channel() != nil && msg.ChannelUUID() != "", "can't queue a message to courier without a channel") - assert(msg.Channel().Type() != models.ChannelTypeAndroid, "can't queue an android message to courier") assert(msg.URN() != urns.NilURN && msg.ContactURNID() != nil, "can't queue a message to courier without a URN") - // if this msg is the same channel and priority, add to current batch, otherwise start new batch - if msg.Channel() == currentChannel && msg.HighPriority() == currentPriority { + // if this msg is the same priority, add to current batch, otherwise start new batch + if msg.HighPriority() == currentPriority { batch = append(batch, msg) } else { if err := commitBatch(); err != nil { return err } - currentChannel = msg.Channel() currentPriority = msg.HighPriority() batch = []*models.Msg{msg} } diff --git a/core/msgio/courier_test.go b/core/msgio/courier_test.go index b5472740d..fb4f94cc0 100644 --- a/core/msgio/courier_test.go +++ b/core/msgio/courier_test.go @@ -22,14 +22,13 @@ func TestQueueCourierMessages(t *testing.T) { defer testsuite.Reset(testsuite.ResetData | testsuite.ResetRedis) - // create an Andoid channel - androidChannel := testdata.InsertChannel(rt, testdata.Org1, "A", "Android 1", []string{"tel"}, "SR", map[string]interface{}{"FCM_ID": "FCMID"}) - oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshOrg|models.RefreshChannels) require.NoError(t, err) + twilio := oa.ChannelByUUID(testdata.TwilioChannel.UUID) + // noop if no messages provided - msgio.QueueCourierMessages(rc, testdata.Cathy.ID, []*models.Msg{}) + msgio.QueueCourierMessages(rc, testdata.Cathy.ID, twilio, []*models.Msg{}) testsuite.AssertCourierQueues(t, map[string][]int{}) // queue 3 messages for Cathy.. @@ -37,27 +36,13 @@ func TestQueueCourierMessages(t *testing.T) { (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa), (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa), (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy, HighPriority: true}).createMsg(t, rt, oa), - (&msgSpec{Channel: testdata.VonageChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa), } - msgio.QueueCourierMessages(rc, testdata.Cathy.ID, msgs) + msgio.QueueCourierMessages(rc, testdata.Cathy.ID, twilio, msgs) testsuite.AssertCourierQueues(t, map[string][]int{ "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {2}, // twilio, bulk priority "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/1": {1}, // twilio, high priority - "msgs:19012bfd-3ce3-4cae-9bb9-76cf92c73d49|10/0": {1}, // vonage, bulk priority - }) - - // check that trying to queue a message without a channel will panic - assert.Panics(t, func() { - ms := msgSpec{Channel: nil, Contact: testdata.Cathy} - msgio.QueueCourierMessages(rc, testdata.Cathy.ID, []*models.Msg{ms.createMsg(t, rt, oa)}) - }) - - // check that trying to queue an Android message will panic - assert.Panics(t, func() { - ms := msgSpec{Channel: androidChannel, Contact: testdata.Cathy} - msgio.QueueCourierMessages(rc, testdata.Cathy.ID, []*models.Msg{ms.createMsg(t, rt, oa)}) }) } @@ -71,15 +56,20 @@ func TestClearChannelCourierQueue(t *testing.T) { oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshOrg|models.RefreshChannels) require.NoError(t, err) - // queue 3 messages for Cathy.. - msgs := []*models.Msg{ + twilio := oa.ChannelByUUID(testdata.TwilioChannel.UUID) + vonage := oa.ChannelByUUID(testdata.VonageChannel.UUID) + + // queue 3 Twilio messages for Cathy.. + msgio.QueueCourierMessages(rc, testdata.Cathy.ID, twilio, []*models.Msg{ (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa), (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa), (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy, HighPriority: true}).createMsg(t, rt, oa), - (&msgSpec{Channel: testdata.VonageChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa), - } + }) - msgio.QueueCourierMessages(rc, testdata.Cathy.ID, msgs) + // and a Vonage message + msgio.QueueCourierMessages(rc, testdata.Cathy.ID, vonage, []*models.Msg{ + (&msgSpec{Channel: testdata.VonageChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa), + }) testsuite.AssertCourierQueues(t, map[string][]int{ "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {2}, // twilio, bulk priority diff --git a/core/msgio/send.go b/core/msgio/send.go index a47f0bb04..387309ed8 100644 --- a/core/msgio/send.go +++ b/core/msgio/send.go @@ -6,13 +6,19 @@ import ( "github.com/edganiukov/fcm" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/runtime" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) +type contactAndChannel struct { + contactID models.ContactID + channel *models.Channel +} + // SendMessages tries to send the given messages via Courier or Android syncing -func SendMessages(ctx context.Context, rt *runtime.Runtime, tx models.Queryer, fc *fcm.Client, msgs []*models.Msg) { - // messages to be sent by courier, organized by contact - courierMsgs := make(map[models.ContactID][]*models.Msg, 100) +func SendMessages(ctx context.Context, rt *runtime.Runtime, tx models.Queryer, fc *fcm.Client, msgs []*models.Msg) error { + // messages to be sent by courier, organized by contact+channel + courierMsgs := make(map[contactAndChannel][]*models.Msg, 100) // android channels that need to be notified to sync androidChannels := make([]*models.Channel, 0, 5) @@ -28,7 +34,13 @@ func SendMessages(ctx context.Context, rt *runtime.Runtime, tx models.Queryer, f continue } - channel := msg.Channel() + oa, err := models.GetOrgAssets(ctx, rt, msg.OrgID()) + if err != nil { + return errors.Wrap(err, "unable to load org assets") + } + + channel := oa.ChannelByID(msg.ChannelID()) + if channel != nil { if channel.Type() == models.ChannelTypeAndroid { if !androidChannelsSeen[channel] { @@ -36,7 +48,8 @@ func SendMessages(ctx context.Context, rt *runtime.Runtime, tx models.Queryer, f } androidChannelsSeen[channel] = true } else { - courierMsgs[msg.ContactID()] = append(courierMsgs[msg.ContactID()], msg) + cc := contactAndChannel{msg.ContactID(), channel} + courierMsgs[cc] = append(courierMsgs[cc], msg) } } else { pending = append(pending, msg) @@ -48,12 +61,12 @@ func SendMessages(ctx context.Context, rt *runtime.Runtime, tx models.Queryer, f rc := rt.RP.Get() defer rc.Close() - for contactID, contactMsgs := range courierMsgs { - err := QueueCourierMessages(rc, contactID, contactMsgs) + for cc, contactMsgs := range courierMsgs { + err := QueueCourierMessages(rc, cc.contactID, cc.channel, contactMsgs) // not being able to queue a message isn't the end of the world, log but don't return an error if err != nil { - logrus.WithField("messages", contactMsgs).WithField("contact", contactID).WithError(err).Error("error queuing messages") + logrus.WithField("messages", contactMsgs).WithField("contact", cc.contactID).WithError(err).Error("error queuing messages") // in the case of errors we do want to change the messages back to pending however so they // get queued later. (for the common case messages are only inserted and queued, without a status update) @@ -78,6 +91,8 @@ func SendMessages(ctx context.Context, rt *runtime.Runtime, tx models.Queryer, f logrus.WithError(err).Error("error marking message as pending") } } + + return nil } func assert(c bool, m string) { From 7113234eb8a4e3422b3920c201345099a27e0c32 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 13 Apr 2023 13:26:52 -0500 Subject: [PATCH 2/2] Remove Msg.Flow and only add to courier payload --- core/models/channels.go | 6 ++ core/models/msgs.go | 71 ++++++-------- core/msgio/courier.go | 38 ++++--- core/msgio/courier_test.go | 196 +++++++++++++++++++++++++++++++++++-- core/msgio/send.go | 9 +- 5 files changed, 260 insertions(+), 60 deletions(-) diff --git a/core/models/channels.go b/core/models/channels.go index eb347555b..ec2903260 100644 --- a/core/models/channels.go +++ b/core/models/channels.go @@ -43,6 +43,7 @@ type Channel struct { c struct { ID ChannelID `json:"id"` UUID assets.ChannelUUID `json:"uuid"` + OrgID OrgID `json:"org_id"` Parent *assets.ChannelReference `json:"parent"` Name string `json:"name"` Address string `json:"address"` @@ -61,6 +62,9 @@ type Channel struct { // ID returns the id of this channel func (c *Channel) ID() ChannelID { return c.c.ID } +// OrgID returns the org id of this channel +func (c *Channel) OrgID() OrgID { return c.c.OrgID } + // UUID returns the UUID of this channel func (c *Channel) UUID() assets.ChannelUUID { return c.c.UUID } @@ -150,6 +154,7 @@ const sqlSelectChannelsByID = ` SELECT ROW_TO_JSON(r) FROM (SELECT c.id as id, c.uuid as uuid, + c.org_id as org_id, c.name as name, c.channel_type as channel_type, COALESCE(c.tps, 10) as tps, @@ -190,6 +195,7 @@ const sqlSelectChannels = ` SELECT ROW_TO_JSON(r) FROM (SELECT c.id as id, c.uuid as uuid, + c.org_id as org_id, (SELECT ROW_TO_JSON(p) FROM (SELECT uuid, name FROM channels_channel cc where cc.id = c.parent_id) p) as parent, c.name as name, c.channel_type as channel_type, diff --git a/core/models/msgs.go b/core/models/msgs.go index f70d3a375..b2cdfc003 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -3,7 +3,6 @@ package models import ( "context" "database/sql/driver" - "encoding/json" "fmt" "strings" "time" @@ -128,11 +127,10 @@ type Msg struct { } // extra data added to the courier payload - Flow *assets.FlowReference `json:"flow,omitempty"` - ResponseToExternalID null.String `json:"response_to_external_id,omitempty"` - IsResend bool `json:"is_resend,omitempty"` - SessionID SessionID `json:"session_id,omitempty"` - SessionStatus SessionStatus `json:"session_status,omitempty"` + ResponseToExternalID null.String `json:"response_to_external_id,omitempty"` + IsResend bool `json:"is_resend,omitempty"` + SessionID SessionID `json:"session_id,omitempty"` + SessionStatus SessionStatus `json:"session_status,omitempty"` // These fields are set on the last outgoing message in a session's sprint. In the case // of the session being at a wait with a timeout then the timeout will be set. It is up to @@ -141,34 +139,34 @@ type Msg struct { SessionTimeout int `json:"session_timeout,omitempty"` } -func (m *Msg) ID() flows.MsgID { return m.m.ID } -func (m *Msg) BroadcastID() BroadcastID { return m.m.BroadcastID } -func (m *Msg) UUID() flows.MsgUUID { return m.m.UUID } -func (m *Msg) Text() string { return m.m.Text } -func (m *Msg) QuickReplies() []string { return m.m.QuickReplies } -func (m *Msg) Locale() envs.Locale { return m.m.Locale } -func (m *Msg) HighPriority() bool { return m.m.HighPriority } -func (m *Msg) CreatedOn() time.Time { return m.m.CreatedOn } -func (m *Msg) ModifiedOn() time.Time { return m.m.ModifiedOn } -func (m *Msg) SentOn() *time.Time { return m.m.SentOn } -func (m *Msg) QueuedOn() time.Time { return m.m.QueuedOn } -func (m *Msg) Direction() MsgDirection { return m.m.Direction } -func (m *Msg) Status() MsgStatus { return m.m.Status } -func (m *Msg) Visibility() MsgVisibility { return m.m.Visibility } -func (m *Msg) Type() MsgType { return m.m.MsgType } -func (m *Msg) ErrorCount() int { return m.m.ErrorCount } -func (m *Msg) NextAttempt() *time.Time { return m.m.NextAttempt } -func (m *Msg) FailedReason() MsgFailedReason { return m.m.FailedReason } -func (m *Msg) ExternalID() null.String { return m.m.ExternalID } -func (m *Msg) Metadata() map[string]interface{} { return m.m.Metadata } -func (m *Msg) MsgCount() int { return m.m.MsgCount } -func (m *Msg) ChannelID() ChannelID { return m.m.ChannelID } -func (m *Msg) URN() urns.URN { return m.m.URN } -func (m *Msg) URNAuth() null.String { return m.m.URNAuth } -func (m *Msg) OrgID() OrgID { return m.m.OrgID } -func (m *Msg) FlowID() FlowID { return m.m.FlowID } -func (m *Msg) ContactID() ContactID { return m.m.ContactID } -func (m *Msg) ContactURNID() *URNID { return m.m.ContactURNID } +func (m *Msg) ID() flows.MsgID { return m.m.ID } +func (m *Msg) BroadcastID() BroadcastID { return m.m.BroadcastID } +func (m *Msg) UUID() flows.MsgUUID { return m.m.UUID } +func (m *Msg) Text() string { return m.m.Text } +func (m *Msg) QuickReplies() []string { return m.m.QuickReplies } +func (m *Msg) Locale() envs.Locale { return m.m.Locale } +func (m *Msg) HighPriority() bool { return m.m.HighPriority } +func (m *Msg) CreatedOn() time.Time { return m.m.CreatedOn } +func (m *Msg) ModifiedOn() time.Time { return m.m.ModifiedOn } +func (m *Msg) SentOn() *time.Time { return m.m.SentOn } +func (m *Msg) QueuedOn() time.Time { return m.m.QueuedOn } +func (m *Msg) Direction() MsgDirection { return m.m.Direction } +func (m *Msg) Status() MsgStatus { return m.m.Status } +func (m *Msg) Visibility() MsgVisibility { return m.m.Visibility } +func (m *Msg) Type() MsgType { return m.m.MsgType } +func (m *Msg) ErrorCount() int { return m.m.ErrorCount } +func (m *Msg) NextAttempt() *time.Time { return m.m.NextAttempt } +func (m *Msg) FailedReason() MsgFailedReason { return m.m.FailedReason } +func (m *Msg) ExternalID() null.String { return m.m.ExternalID } +func (m *Msg) Metadata() map[string]any { return m.m.Metadata } +func (m *Msg) MsgCount() int { return m.m.MsgCount } +func (m *Msg) ChannelID() ChannelID { return m.m.ChannelID } +func (m *Msg) URN() urns.URN { return m.m.URN } +func (m *Msg) URNAuth() null.String { return m.m.URNAuth } +func (m *Msg) OrgID() OrgID { return m.m.OrgID } +func (m *Msg) FlowID() FlowID { return m.m.FlowID } +func (m *Msg) ContactID() ContactID { return m.m.ContactID } +func (m *Msg) ContactURNID() *URNID { return m.m.ContactURNID } func (m *Msg) SetChannel(channel *Channel) { if channel != nil { @@ -207,10 +205,6 @@ func (m *Msg) Attachments() []utils.Attachment { return attachments } -func (m *Msg) MarshalJSON() ([]byte, error) { - return json.Marshal(m.m) -} - // NewIncomingIVR creates a new incoming IVR message for the passed in text and attachment func NewIncomingIVR(cfg *runtime.Config, orgID OrgID, call *Call, in *flows.MsgIn, createdOn time.Time) *Msg { msg := &Msg{} @@ -348,7 +342,6 @@ func newOutgoingTextMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact if flow != nil { m.FlowID = flow.ID() - msg.Flow = flow.Reference() } // if we're responding to an incoming message, send as high priority diff --git a/core/msgio/courier.go b/core/msgio/courier.go index 16e25f92c..f56fce027 100644 --- a/core/msgio/courier.go +++ b/core/msgio/courier.go @@ -20,7 +20,6 @@ import ( "github.com/nyaruka/goflow/utils" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/runtime" - "github.com/nyaruka/null/v2" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -34,6 +33,7 @@ const ( highPriority = 1 ) +// Msg is the format of a message queued to courier type Msg struct { ID flows.MsgID `json:"id"` UUID flows.MsgUUID `json:"uuid"` @@ -53,12 +53,12 @@ type Msg struct { ContactID models.ContactID `json:"contact_id"` ContactURNID *models.URNID `json:"contact_urn_id"` URN urns.URN `json:"urn"` - URNAuth null.String `json:"urn_auth,omitempty"` + URNAuth string `json:"urn_auth,omitempty"` SentOn *time.Time `json:"sent_on"` QueuedOn time.Time `json:"queued_on"` ErrorCount int `json:"error_count"` NextAttempt *time.Time `json:"next_attempt"` - Metadata null.Map `json:"metadata,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` Flow *assets.FlowReference `json:"flow,omitempty"` ResponseToExternalID string `json:"response_to_external_id,omitempty"` IsResend bool `json:"is_resend,omitempty"` @@ -69,7 +69,17 @@ type Msg struct { SessionTimeout int `json:"session_timeout,omitempty"` } -func NewMsg(m *models.Msg, ch *models.Channel) *Msg { +// NewCourierMsg creates a courier message in the format it's expecting to be queued +func NewCourierMsg(oa *models.OrgAssets, m *models.Msg, channel *models.Channel) (*Msg, error) { + var flowRef *assets.FlowReference + if m.FlowID() != models.NilFlowID { + flow, err := oa.FlowByID(m.FlowID()) + if err != nil { + return nil, errors.Wrap(err, "error loading message flow") + } + flowRef = flow.Reference() + } + return &Msg{ ID: m.ID(), UUID: m.UUID(), @@ -85,24 +95,24 @@ func NewMsg(m *models.Msg, ch *models.Channel) *Msg { CreatedOn: m.CreatedOn(), ModifiedOn: m.ModifiedOn(), ChannelID: m.ChannelID(), - ChannelUUID: ch.UUID(), + ChannelUUID: channel.UUID(), ContactID: m.ContactID(), ContactURNID: m.ContactURNID(), URN: m.URN(), - URNAuth: m.URNAuth(), + URNAuth: string(m.URNAuth()), SentOn: m.SentOn(), QueuedOn: m.QueuedOn(), ErrorCount: m.ErrorCount(), NextAttempt: m.NextAttempt(), Metadata: m.Metadata(), - Flow: m.Flow, + Flow: flowRef, ResponseToExternalID: string(m.ResponseToExternalID), IsResend: m.IsResend, SessionID: m.SessionID, SessionStatus: m.SessionStatus, SessionWaitStartedOn: m.SessionWaitStartedOn, SessionTimeout: m.SessionTimeout, - } + }, nil } var queuePushScript = redis.NewScript(6, ` @@ -135,7 +145,7 @@ end `) // PushCourierBatch pushes a batch of messages for a single contact and channel onto the appropriate courier queue -func PushCourierBatch(rc redis.Conn, ch *models.Channel, msgs []*models.Msg, timestamp string) error { +func PushCourierBatch(rc redis.Conn, oa *models.OrgAssets, ch *models.Channel, msgs []*models.Msg, timestamp string) error { priority := bulkPriority if msgs[0].HighPriority() { priority = highPriority @@ -143,7 +153,11 @@ func PushCourierBatch(rc redis.Conn, ch *models.Channel, msgs []*models.Msg, tim batch := make([]*Msg, len(msgs)) for i, m := range msgs { - batch[i] = NewMsg(m, ch) + var err error + batch[i], err = NewCourierMsg(oa, m, ch) + if err != nil { + return errors.Wrap(err, "error creating courier message") + } } batchJSON := jsonx.MustMarshal(batch) @@ -153,7 +167,7 @@ func PushCourierBatch(rc redis.Conn, ch *models.Channel, msgs []*models.Msg, tim } // QueueCourierMessages queues messages for a single contact to Courier -func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, channel *models.Channel, msgs []*models.Msg) error { +func QueueCourierMessages(rc redis.Conn, oa *models.OrgAssets, contactID models.ContactID, channel *models.Channel, msgs []*models.Msg) error { if len(msgs) == 0 { return nil } @@ -172,7 +186,7 @@ func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, channel *mo commitBatch := func() error { if len(batch) > 0 { start := time.Now() - err := PushCourierBatch(rc, channel, batch, epochSeconds) + err := PushCourierBatch(rc, oa, channel, batch, epochSeconds) if err != nil { return err } diff --git a/core/msgio/courier_test.go b/core/msgio/courier_test.go index fb4f94cc0..2ee982631 100644 --- a/core/msgio/courier_test.go +++ b/core/msgio/courier_test.go @@ -2,19 +2,201 @@ package msgio_test import ( "encoding/json" + "fmt" "testing" + "time" "github.com/gomodule/redigo/redis" + "github.com/nyaruka/gocommon/dbutil/assertdb" "github.com/nyaruka/gocommon/jsonx" + "github.com/nyaruka/gocommon/urns" + "github.com/nyaruka/goflow/assets" + "github.com/nyaruka/goflow/envs" + "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/test" + "github.com/nyaruka/goflow/utils" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/core/msgio" "github.com/nyaruka/mailroom/testsuite" "github.com/nyaruka/mailroom/testsuite/testdata" + "github.com/nyaruka/null/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func TestNewCourierMsg(t *testing.T) { + ctx, rt := testsuite.Runtime() + + defer testsuite.Reset(testsuite.ResetData) + + assertdb.Query(t, rt.DB, `SELECT count(*) FROM orgs_org WHERE is_suspended = TRUE`).Returns(0) + + oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID) + require.NoError(t, err) + require.False(t, oa.Org().Suspended()) + + channel := oa.ChannelByUUID(testdata.TwilioChannel.UUID) + flow, _ := oa.FlowByID(testdata.Favorites.ID) + urn := urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", testdata.Cathy.URNID)) + flowMsg1 := flows.NewMsgOut( + urn, + assets.NewChannelReference(testdata.TwilioChannel.UUID, "Test Channel"), + "Hi there", + []utils.Attachment{utils.Attachment("image/jpeg:https://dl-foo.com/image.jpg")}, + []string{"yes", "no"}, + flows.NewMsgTemplating(assets.NewTemplateReference("4474d39c-ac2c-486d-bceb-8a774a515299", "tpl"), []string{"name"}, "tpls"), + flows.MsgTopicPurchase, + envs.Locale(`eng-US`), + flows.NilUnsendableReason, + ) + + // create a non-priority flow message.. i.e. the session isn't responding to an incoming message + testdata.InsertWaitingSession(rt, testdata.Org1, testdata.Cathy, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, nil) + _, flowContact := testdata.Cathy.Load(rt, oa) + session, err := models.FindWaitingSessionForContact(ctx, rt.DB, rt.SessionStorage, oa, models.FlowTypeMessaging, flowContact) + require.NoError(t, err) + + msg1, err := models.NewOutgoingFlowMsg(rt, oa.Org(), channel, session, flow, flowMsg1, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC)) + require.NoError(t, err) + + cathy := session.Contact() + + err = models.InsertMessages(ctx, rt.DB, []*models.Msg{msg1}) + require.NoError(t, err) + + cmsg1, err := msgio.NewCourierMsg(oa, msg1, channel) + assert.NoError(t, err) + + marshaled := jsonx.MustMarshal(cmsg1) + + test.AssertEqualJSON(t, []byte(fmt.Sprintf(`{ + "attachments": [ + "image/jpeg:https://dl-foo.com/image.jpg" + ], + "channel_id": 10000, + "channel_uuid": "74729f45-7f29-4868-9dc4-90e491e3c7d8", + "contact_id": 10000, + "contact_urn_id": 10000, + "created_on": "2021-11-09T14:03:30Z", + "direction": "O", + "error_count": 0, + "flow": {"uuid": "9de3663f-c5c5-4c92-9f45-ecbc09abcc85", "name": "Favorites"}, + "high_priority": false, + "id": %d, + "locale": "eng-US", + "metadata": { + "templating": { + "namespace": "tpls", + "template": {"name": "tpl", "uuid": "4474d39c-ac2c-486d-bceb-8a774a515299"}, + "variables": ["name"] + }, + "topic": "purchase" + }, + "modified_on": %s, + "next_attempt": null, + "org_id": 1, + "queued_on": %s, + "quick_replies": [ + "yes", + "no" + ], + "sent_on": null, + "session_id": %d, + "session_status": "W", + "status": "Q", + "text": "Hi there", + "tps_cost": 2, + "urn": "tel:+250700000001?id=10000", + "uuid": "%s" + }`, msg1.ID(), jsonx.MustMarshal(msg1.ModifiedOn()), jsonx.MustMarshal(msg1.QueuedOn()), session.ID(), msg1.UUID())), marshaled) + + // create a priority flow message.. i.e. the session is responding to an incoming message + flowMsg2 := flows.NewMsgOut( + urn, + assets.NewChannelReference(testdata.TwilioChannel.UUID, "Test Channel"), + "Hi there", + nil, nil, nil, + flows.NilMsgTopic, + envs.NilLocale, + flows.NilUnsendableReason, + ) + in1 := testdata.InsertIncomingMsg(rt, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "test", models.MsgStatusHandled) + session.SetIncomingMsg(models.MsgID(in1.ID()), null.String("EX123")) + msg2, err := models.NewOutgoingFlowMsg(rt, oa.Org(), channel, session, flow, flowMsg2, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC)) + require.NoError(t, err) + + err = models.InsertMessages(ctx, rt.DB, []*models.Msg{msg2}) + require.NoError(t, err) + + cmsg2, err := msgio.NewCourierMsg(oa, msg2, channel) + assert.NoError(t, err) + + marshaled = jsonx.MustMarshal(cmsg2) + + test.AssertEqualJSON(t, []byte(fmt.Sprintf(`{ + "channel_id": 10000, + "channel_uuid": "74729f45-7f29-4868-9dc4-90e491e3c7d8", + "contact_id": 10000, + "contact_urn_id": 10000, + "created_on": "2021-11-09T14:03:30Z", + "direction": "O", + "error_count": 0, + "flow": {"uuid": "9de3663f-c5c5-4c92-9f45-ecbc09abcc85", "name": "Favorites"}, + "response_to_external_id": "EX123", + "high_priority": true, + "id": %d, + "modified_on": %s, + "next_attempt": null, + "org_id": 1, + "queued_on": %s, + "sent_on": null, + "session_id": %d, + "session_status": "W", + "status": "Q", + "text": "Hi there", + "tps_cost": 1, + "urn": "tel:+250700000001?id=10000", + "uuid": "%s" + }`, msg2.ID(), jsonx.MustMarshal(msg2.ModifiedOn()), jsonx.MustMarshal(msg2.QueuedOn()), session.ID(), msg2.UUID())), marshaled) + + // try a broadcast message which won't have session and flow fields set + bcastID := testdata.InsertBroadcast(rt, testdata.Org1, `eng`, map[envs.Language]string{`eng`: "Blast"}, models.NilScheduleID, []*testdata.Contact{testdata.Cathy}, nil) + bcastMsg1 := flows.NewMsgOut(urn, assets.NewChannelReference(testdata.TwilioChannel.UUID, "Test Channel"), "Blast", nil, nil, nil, flows.NilMsgTopic, envs.NilLocale, flows.NilUnsendableReason) + msg3, err := models.NewOutgoingBroadcastMsg(rt, oa.Org(), channel, cathy, bcastMsg1, time.Date(2021, 11, 9, 14, 3, 30, 0, time.UTC), &models.BroadcastBatch{BroadcastID: bcastID, CreatedByID: testdata.Admin.ID}) + require.NoError(t, err) + + err = models.InsertMessages(ctx, rt.DB, []*models.Msg{msg3}) + require.NoError(t, err) + + cmsg3, err := msgio.NewCourierMsg(oa, msg3, channel) + assert.NoError(t, err) + + marshaled = jsonx.MustMarshal(cmsg3) + + test.AssertEqualJSON(t, []byte(fmt.Sprintf(`{ + "channel_id": 10000, + "channel_uuid": "74729f45-7f29-4868-9dc4-90e491e3c7d8", + "contact_id": 10000, + "contact_urn_id": 10000, + "created_on": "2021-11-09T14:03:30Z", + "direction": "O", + "error_count": 0, + "high_priority": false, + "id": %d, + "modified_on": %s, + "next_attempt": null, + "org_id": 1, + "queued_on": %s, + "sent_on": null, + "status": "Q", + "text": "Blast", + "tps_cost": 1, + "urn": "tel:+250700000001?id=10000", + "uuid": "%s" + }`, msg3.ID(), jsonx.MustMarshal(msg3.ModifiedOn()), jsonx.MustMarshal(msg3.QueuedOn()), msg3.UUID())), marshaled) +} + func TestQueueCourierMessages(t *testing.T) { ctx, rt := testsuite.Runtime() rc := rt.RP.Get() @@ -28,7 +210,7 @@ func TestQueueCourierMessages(t *testing.T) { twilio := oa.ChannelByUUID(testdata.TwilioChannel.UUID) // noop if no messages provided - msgio.QueueCourierMessages(rc, testdata.Cathy.ID, twilio, []*models.Msg{}) + msgio.QueueCourierMessages(rc, oa, testdata.Cathy.ID, twilio, []*models.Msg{}) testsuite.AssertCourierQueues(t, map[string][]int{}) // queue 3 messages for Cathy.. @@ -38,7 +220,7 @@ func TestQueueCourierMessages(t *testing.T) { (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy, HighPriority: true}).createMsg(t, rt, oa), } - msgio.QueueCourierMessages(rc, testdata.Cathy.ID, twilio, msgs) + msgio.QueueCourierMessages(rc, oa, testdata.Cathy.ID, twilio, msgs) testsuite.AssertCourierQueues(t, map[string][]int{ "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {2}, // twilio, bulk priority @@ -60,14 +242,14 @@ func TestClearChannelCourierQueue(t *testing.T) { vonage := oa.ChannelByUUID(testdata.VonageChannel.UUID) // queue 3 Twilio messages for Cathy.. - msgio.QueueCourierMessages(rc, testdata.Cathy.ID, twilio, []*models.Msg{ + msgio.QueueCourierMessages(rc, oa, testdata.Cathy.ID, twilio, []*models.Msg{ (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa), (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa), (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy, HighPriority: true}).createMsg(t, rt, oa), }) // and a Vonage message - msgio.QueueCourierMessages(rc, testdata.Cathy.ID, vonage, []*models.Msg{ + msgio.QueueCourierMessages(rc, oa, testdata.Cathy.ID, vonage, []*models.Msg{ (&msgSpec{Channel: testdata.VonageChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa), }) @@ -105,7 +287,7 @@ func TestPushCourierBatch(t *testing.T) { msg1 := (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa) msg2 := (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa) - err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg1, msg2}, "1636557205.123456") + err = msgio.PushCourierBatch(rc, oa, channel, []*models.Msg{msg1, msg2}, "1636557205.123456") require.NoError(t, err) // check that channel has been added to active list @@ -132,7 +314,7 @@ func TestPushCourierBatch(t *testing.T) { msg3 := (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa) - err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg3}, "1636557205.234567") + err = msgio.PushCourierBatch(rc, oa, channel, []*models.Msg{msg3}, "1636557205.234567") require.NoError(t, err) queued, err = redis.ByteSlices(rc.Do("ZRANGE", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0", 0, -1)) @@ -145,7 +327,7 @@ func TestPushCourierBatch(t *testing.T) { msg4 := (&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa) - err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg4}, "1636557205.345678") + err = msgio.PushCourierBatch(rc, oa, channel, []*models.Msg{msg4}, "1636557205.345678") require.NoError(t, err) // check that channel has *not* been added to active list diff --git a/core/msgio/send.go b/core/msgio/send.go index 387309ed8..67739d88b 100644 --- a/core/msgio/send.go +++ b/core/msgio/send.go @@ -36,7 +36,7 @@ func SendMessages(ctx context.Context, rt *runtime.Runtime, tx models.Queryer, f oa, err := models.GetOrgAssets(ctx, rt, msg.OrgID()) if err != nil { - return errors.Wrap(err, "unable to load org assets") + return errors.Wrap(err, "error getting org assets") } channel := oa.ChannelByID(msg.ChannelID()) @@ -62,7 +62,12 @@ func SendMessages(ctx context.Context, rt *runtime.Runtime, tx models.Queryer, f defer rc.Close() for cc, contactMsgs := range courierMsgs { - err := QueueCourierMessages(rc, cc.contactID, cc.channel, contactMsgs) + oa, err := models.GetOrgAssets(ctx, rt, cc.channel.OrgID()) + if err != nil { + return errors.Wrap(err, "error getting org assets") + } + + err = QueueCourierMessages(rc, oa, cc.contactID, cc.channel, contactMsgs) // not being able to queue a message isn't the end of the world, log but don't return an error if err != nil {