Skip to content

Commit

Permalink
Merge pull request #54 from nyaruka/simpler_sending
Browse files Browse the repository at this point in the history
Rework message marshaling for courier
  • Loading branch information
rowanseymour authored Apr 13, 2023
2 parents 68c0509 + 7113234 commit 9f7a24d
Show file tree
Hide file tree
Showing 6 changed files with 420 additions and 338 deletions.
6 changes: 6 additions & 0 deletions core/models/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Channel struct {
c struct {
ID ChannelID `json:"id"`
UUID assets.ChannelUUID `json:"uuid"`
OrgID OrgID `json:"org_id"`
Parent *assets.ChannelReference `json:"parent"`
Name string `json:"name"`
Address string `json:"address"`
Expand All @@ -61,6 +62,9 @@ type Channel struct {
// ID returns the id of this channel
func (c *Channel) ID() ChannelID { return c.c.ID }

// OrgID returns the org id of this channel
func (c *Channel) OrgID() OrgID { return c.c.OrgID }

// UUID returns the UUID of this channel
func (c *Channel) UUID() assets.ChannelUUID { return c.c.UUID }

Expand Down Expand Up @@ -150,6 +154,7 @@ const sqlSelectChannelsByID = `
SELECT ROW_TO_JSON(r) FROM (SELECT
c.id as id,
c.uuid as uuid,
c.org_id as org_id,
c.name as name,
c.channel_type as channel_type,
COALESCE(c.tps, 10) as tps,
Expand Down Expand Up @@ -190,6 +195,7 @@ const sqlSelectChannels = `
SELECT ROW_TO_JSON(r) FROM (SELECT
c.id as id,
c.uuid as uuid,
c.org_id as org_id,
(SELECT ROW_TO_JSON(p) FROM (SELECT uuid, name FROM channels_channel cc where cc.id = c.parent_id) p) as parent,
c.name as name,
c.channel_type as channel_type,
Expand Down
198 changes: 81 additions & 117 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package models
import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -89,95 +88,91 @@ var unsendableToFailedReason = map[flows.UnsendableReason]MsgFailedReason{
// Msg is our type for mailroom messages
type Msg struct {
m struct {
ID flows.MsgID `db:"id" json:"id"`
BroadcastID BroadcastID `db:"broadcast_id" json:"broadcast_id,omitempty"`
UUID flows.MsgUUID `db:"uuid" json:"uuid"`
Text string `db:"text" json:"text"`
Attachments pq.StringArray `db:"attachments" json:"attachments,omitempty"`
QuickReplies pq.StringArray `db:"quick_replies" json:"quick_replies,omitempty"`
Locale envs.Locale `db:"locale" json:"locale,omitempty"`
HighPriority bool `db:"high_priority" json:"high_priority"`
CreatedOn time.Time `db:"created_on" json:"created_on"`
ModifiedOn time.Time `db:"modified_on" json:"modified_on"`
SentOn *time.Time `db:"sent_on" json:"sent_on"`
QueuedOn time.Time `db:"queued_on" json:"queued_on"`
Direction MsgDirection `db:"direction" json:"direction"`
Status MsgStatus `db:"status" json:"status"`
Visibility MsgVisibility `db:"visibility" json:"-"`
MsgType MsgType `db:"msg_type" json:"-"`
MsgCount int `db:"msg_count" json:"tps_cost"`
ErrorCount int `db:"error_count" json:"error_count"`
NextAttempt *time.Time `db:"next_attempt" json:"next_attempt"`
FailedReason MsgFailedReason `db:"failed_reason" json:"-"`
ExternalID null.String `db:"external_id" json:"-"`
ResponseToExternalID null.String ` json:"response_to_external_id,omitempty"`
Metadata null.Map `db:"metadata" json:"metadata,omitempty"`
ChannelID ChannelID `db:"channel_id" json:"channel_id"`
ChannelUUID assets.ChannelUUID ` json:"channel_uuid"`
ContactID ContactID `db:"contact_id" json:"contact_id"`
ContactURNID *URNID `db:"contact_urn_id" json:"contact_urn_id"`
IsResend bool ` json:"is_resend,omitempty"`
URN urns.URN `db:"urn_urn" json:"urn"`
URNAuth null.String `db:"urn_auth" json:"urn_auth,omitempty"`
OrgID OrgID `db:"org_id" json:"org_id"`
FlowID FlowID `db:"flow_id" json:"-"`
CreatedByID UserID `db:"created_by_id" json:"-"`

// extra data from handling added to the courier payload
SessionID SessionID `json:"session_id,omitempty"`
SessionStatus SessionStatus `json:"session_status,omitempty"`
Flow *assets.FlowReference `json:"flow,omitempty"`

// These fields are set on the last outgoing message in a session's sprint. In the case
// of the session being at a wait with a timeout then the timeout will be set. It is up to
// Courier to update the session's timeout appropriately after sending the message.
SessionWaitStartedOn *time.Time `json:"session_wait_started_on,omitempty"`
SessionTimeout int `json:"session_timeout,omitempty"`
ID flows.MsgID `db:"id"`
UUID flows.MsgUUID `db:"uuid"`
OrgID OrgID `db:"org_id"`

// origin
BroadcastID BroadcastID `db:"broadcast_id"`
FlowID FlowID `db:"flow_id"`
CreatedByID UserID `db:"created_by_id"`

// content
Text string `db:"text"`
Attachments pq.StringArray `db:"attachments"`
QuickReplies pq.StringArray `db:"quick_replies"`
Locale envs.Locale `db:"locale"`

HighPriority bool `db:"high_priority"`
Direction MsgDirection `db:"direction"`
Status MsgStatus `db:"status"`
Visibility MsgVisibility `db:"visibility"`
MsgType MsgType `db:"msg_type"`
MsgCount int `db:"msg_count"`
CreatedOn time.Time `db:"created_on"`
ModifiedOn time.Time `db:"modified_on"`
ExternalID null.String `db:"external_id"`
Metadata null.Map `db:"metadata"`
ChannelID ChannelID `db:"channel_id"`
ContactID ContactID `db:"contact_id"`
ContactURNID *URNID `db:"contact_urn_id"`
URN urns.URN `db:"urn_urn"`
URNAuth null.String `db:"urn_auth"`

SentOn *time.Time `db:"sent_on"`
QueuedOn time.Time `db:"queued_on"`
ErrorCount int `db:"error_count"`
NextAttempt *time.Time `db:"next_attempt"`
FailedReason MsgFailedReason `db:"failed_reason"`
}

channel *Channel
// extra data added to the courier payload
ResponseToExternalID null.String `json:"response_to_external_id,omitempty"`
IsResend bool `json:"is_resend,omitempty"`
SessionID SessionID `json:"session_id,omitempty"`
SessionStatus SessionStatus `json:"session_status,omitempty"`

// These fields are set on the last outgoing message in a session's sprint. In the case
// of the session being at a wait with a timeout then the timeout will be set. It is up to
// Courier to update the session's timeout appropriately after sending the message.
SessionWaitStartedOn *time.Time `json:"session_wait_started_on,omitempty"`
SessionTimeout int `json:"session_timeout,omitempty"`
}

func (m *Msg) ID() flows.MsgID { return m.m.ID }
func (m *Msg) BroadcastID() BroadcastID { return m.m.BroadcastID }
func (m *Msg) UUID() flows.MsgUUID { return m.m.UUID }
func (m *Msg) Channel() *Channel { return m.channel }
func (m *Msg) Text() string { return m.m.Text }
func (m *Msg) QuickReplies() []string { return m.m.QuickReplies }
func (m *Msg) Locale() envs.Locale { return m.m.Locale }
func (m *Msg) HighPriority() bool { return m.m.HighPriority }
func (m *Msg) CreatedOn() time.Time { return m.m.CreatedOn }
func (m *Msg) ModifiedOn() time.Time { return m.m.ModifiedOn }
func (m *Msg) SentOn() *time.Time { return m.m.SentOn }
func (m *Msg) QueuedOn() time.Time { return m.m.QueuedOn }
func (m *Msg) Direction() MsgDirection { return m.m.Direction }
func (m *Msg) Status() MsgStatus { return m.m.Status }
func (m *Msg) Visibility() MsgVisibility { return m.m.Visibility }
func (m *Msg) Type() MsgType { return m.m.MsgType }
func (m *Msg) ErrorCount() int { return m.m.ErrorCount }
func (m *Msg) NextAttempt() *time.Time { return m.m.NextAttempt }
func (m *Msg) FailedReason() MsgFailedReason { return m.m.FailedReason }
func (m *Msg) ExternalID() null.String { return m.m.ExternalID }
func (m *Msg) Metadata() map[string]interface{} { return m.m.Metadata }
func (m *Msg) MsgCount() int { return m.m.MsgCount }
func (m *Msg) ChannelID() ChannelID { return m.m.ChannelID }
func (m *Msg) ChannelUUID() assets.ChannelUUID { return m.m.ChannelUUID }
func (m *Msg) URN() urns.URN { return m.m.URN }
func (m *Msg) URNAuth() null.String { return m.m.URNAuth }
func (m *Msg) OrgID() OrgID { return m.m.OrgID }
func (m *Msg) FlowID() FlowID { return m.m.FlowID }
func (m *Msg) ContactID() ContactID { return m.m.ContactID }
func (m *Msg) ContactURNID() *URNID { return m.m.ContactURNID }
func (m *Msg) IsResend() bool { return m.m.IsResend }
func (m *Msg) ID() flows.MsgID { return m.m.ID }
func (m *Msg) BroadcastID() BroadcastID { return m.m.BroadcastID }
func (m *Msg) UUID() flows.MsgUUID { return m.m.UUID }
func (m *Msg) Text() string { return m.m.Text }
func (m *Msg) QuickReplies() []string { return m.m.QuickReplies }
func (m *Msg) Locale() envs.Locale { return m.m.Locale }
func (m *Msg) HighPriority() bool { return m.m.HighPriority }
func (m *Msg) CreatedOn() time.Time { return m.m.CreatedOn }
func (m *Msg) ModifiedOn() time.Time { return m.m.ModifiedOn }
func (m *Msg) SentOn() *time.Time { return m.m.SentOn }
func (m *Msg) QueuedOn() time.Time { return m.m.QueuedOn }
func (m *Msg) Direction() MsgDirection { return m.m.Direction }
func (m *Msg) Status() MsgStatus { return m.m.Status }
func (m *Msg) Visibility() MsgVisibility { return m.m.Visibility }
func (m *Msg) Type() MsgType { return m.m.MsgType }
func (m *Msg) ErrorCount() int { return m.m.ErrorCount }
func (m *Msg) NextAttempt() *time.Time { return m.m.NextAttempt }
func (m *Msg) FailedReason() MsgFailedReason { return m.m.FailedReason }
func (m *Msg) ExternalID() null.String { return m.m.ExternalID }
func (m *Msg) Metadata() map[string]any { return m.m.Metadata }
func (m *Msg) MsgCount() int { return m.m.MsgCount }
func (m *Msg) ChannelID() ChannelID { return m.m.ChannelID }
func (m *Msg) URN() urns.URN { return m.m.URN }
func (m *Msg) URNAuth() null.String { return m.m.URNAuth }
func (m *Msg) OrgID() OrgID { return m.m.OrgID }
func (m *Msg) FlowID() FlowID { return m.m.FlowID }
func (m *Msg) ContactID() ContactID { return m.m.ContactID }
func (m *Msg) ContactURNID() *URNID { return m.m.ContactURNID }

func (m *Msg) SetChannel(channel *Channel) {
m.channel = channel
if channel != nil {
m.m.ChannelID = channel.ID()
m.m.ChannelUUID = channel.UUID()
} else {
m.m.ChannelID = NilChannelID
m.m.ChannelUUID = ""
}
}

Expand Down Expand Up @@ -210,10 +205,6 @@ func (m *Msg) Attachments() []utils.Attachment {
return attachments
}

func (m *Msg) MarshalJSON() ([]byte, error) {
return json.Marshal(m.m)
}

// NewIncomingIVR creates a new incoming IVR message for the passed in text and attachment
func NewIncomingIVR(cfg *runtime.Config, orgID OrgID, call *Call, in *flows.MsgIn, createdOn time.Time) *Msg {
msg := &Msg{}
Expand Down Expand Up @@ -345,13 +336,12 @@ func newOutgoingTextMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact

// if we have a session, set fields on the message from that
if session != nil {
m.ResponseToExternalID = session.IncomingMsgExternalID()
m.SessionID = session.ID()
m.SessionStatus = session.Status()
msg.ResponseToExternalID = session.IncomingMsgExternalID()
msg.SessionID = session.ID()
msg.SessionStatus = session.Status()

if flow != nil {
m.FlowID = flow.ID()
m.Flow = flow.Reference()
}

// if we're responding to an incoming message, send as high priority
Expand Down Expand Up @@ -523,8 +513,6 @@ func loadMessages(ctx context.Context, db Queryer, sql string, params ...interfa
defer rows.Close()

msgs := make([]*Msg, 0)
channelIDsSeen := make(map[ChannelID]bool)
channelIDs := make([]ChannelID, 0, 5)

for rows.Next() {
msg := &Msg{}
Expand All @@ -534,25 +522,6 @@ func loadMessages(ctx context.Context, db Queryer, sql string, params ...interfa
}

msgs = append(msgs, msg)

if msg.ChannelID() != NilChannelID && !channelIDsSeen[msg.ChannelID()] {
channelIDsSeen[msg.ChannelID()] = true
channelIDs = append(channelIDs, msg.ChannelID())
}
}

channels, err := GetChannelsByID(ctx, db, channelIDs)
if err != nil {
return nil, errors.Wrap(err, "error fetching channels for messages")
}

channelsByID := make(map[ChannelID]*Channel)
for _, ch := range channels {
channelsByID[ch.ID()] = ch
}

for _, msg := range msgs {
msg.SetChannel(channelsByID[msg.m.ChannelID])
}

return msgs, nil
Expand All @@ -579,8 +548,8 @@ func NormalizeAttachment(cfg *runtime.Config, attachment utils.Attachment) utils

// SetTimeout sets the timeout for this message
func (m *Msg) SetTimeout(start time.Time, timeout time.Duration) {
m.m.SessionWaitStartedOn = &start
m.m.SessionTimeout = int(timeout / time.Second)
m.SessionWaitStartedOn = &start
m.SessionTimeout = int(timeout / time.Second)
}

// InsertMessages inserts the passed in messages in a single query
Expand Down Expand Up @@ -695,24 +664,19 @@ func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAsse

if ch != nil {
channel := oa.ChannelByUUID(ch.UUID())
msg.channel = channel

msg.m.ChannelID = channel.ID()
msg.m.ChannelUUID = channel.UUID()
msg.m.Status = MsgStatusPending
msg.m.QueuedOn = dates.Now()
msg.m.SentOn = nil
msg.m.ErrorCount = 0
msg.m.FailedReason = ""
msg.m.IsResend = true // mark message as being a resend so it will be queued to courier as such
msg.IsResend = true // mark message as being a resend so it will be queued to courier as such

resends = append(resends, msg.m)
resent = append(resent, msg)
} else {
// if we don't have channel or a URN, fail again
msg.channel = nil
msg.m.ChannelID = NilChannelID
msg.m.ChannelUUID = assets.ChannelUUID("")
msg.m.Status = MsgStatusFailed
msg.m.QueuedOn = dates.Now()
msg.m.SentOn = nil
Expand Down
Loading

0 comments on commit 9f7a24d

Please sign in to comment.