Skip to content

Commit

Permalink
Merge pull request #339 from nyaruka/bcast_tasks_pt2
Browse files Browse the repository at this point in the history
2/3 Start reading from .broadcast field on batch tasks
  • Loading branch information
rowanseymour authored Sep 23, 2024
2 parents 81d44ee + 5d4d726 commit fe7a0a2
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 18 deletions.
22 changes: 18 additions & 4 deletions core/models/broadcasts.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,20 @@ RETURNING id`
const sqlInsertBroadcastContacts = `INSERT INTO msgs_broadcast_contacts(broadcast_id, contact_id) VALUES(:broadcast_id, :contact_id)`
const sqlInsertBroadcastGroups = `INSERT INTO msgs_broadcast_groups(broadcast_id, contactgroup_id) VALUES(:broadcast_id, :contactgroup_id)`

const sqlGetBroadcastByID = `
SELECT id, org_id, status, translations, base_language, expressions, optin_id, template_id, template_variables, created_by_id
FROM msgs_broadcast
WHERE id = $1`

// GetBroadcastByID gets a broadcast by it's ID - NOTE this does not load all attributes of the broadcast
func GetBroadcastByID(ctx context.Context, db DBorTx, bcastID BroadcastID) (*Broadcast, error) {
b := &Broadcast{}
if err := db.GetContext(ctx, b, sqlGetBroadcastByID, bcastID); err != nil {
return nil, fmt.Errorf("error loading broadcast #%d: %w", bcastID, err)
}
return b, nil
}

// BroadcastBatch represents a batch of contacts that need messages sent for
type BroadcastBatch struct {
// for persisted starts broadcast_id is set, for non-persisted broadcasts like flow actions, broadcast is set
Expand Down Expand Up @@ -315,10 +329,10 @@ func (b *BroadcastBatch) createMessage(rt *runtime.Runtime, oa *OrgAssets, c *Co
return nil, fmt.Errorf("error creating flow contact for broadcast message: %w", err)
}

content, locale := b.Translations.ForContact(oa.Env(), contact, b.BaseLanguage)
content, locale := b.Broadcast.Translations.ForContact(oa.Env(), contact, b.Broadcast.BaseLanguage)

var expressionsContext *types.XObject
if b.Expressions {
if b.Broadcast.Expressions {
expressionsContext = types.NewXObject(map[string]types.XValue{
"contact": flows.Context(oa.Env(), contact),
"fields": flows.Context(oa.Env(), contact.Fields()),
Expand All @@ -333,9 +347,9 @@ func (b *BroadcastBatch) createMessage(rt *runtime.Runtime, oa *OrgAssets, c *Co
}

// create our outgoing message
out, ch := CreateMsgOut(rt, oa, contact, content, b.TemplateID, b.TemplateVariables, locale, expressionsContext)
out, ch := CreateMsgOut(rt, oa, contact, content, b.Broadcast.TemplateID, b.Broadcast.TemplateVariables, locale, expressionsContext)

msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), ch, contact, out, b)
msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), ch, contact, out, b.Broadcast)
if err != nil {
return nil, fmt.Errorf("error creating outgoing message: %w", err)
}
Expand Down
21 changes: 11 additions & 10 deletions core/models/broadcasts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,7 @@ func TestNonPersistentBroadcasts(t *testing.T) {
batch := bcast.CreateBatch([]models.ContactID{testdata.Alexandria.ID, testdata.Bob.ID}, false)

assert.Equal(t, models.NilBroadcastID, batch.BroadcastID)
assert.Equal(t, testdata.Org1.ID, batch.OrgID)
assert.Equal(t, i18n.Language("eng"), batch.BaseLanguage)
assert.Equal(t, translations, batch.Translations)
assert.Equal(t, optIn.ID, batch.OptInID)
assert.NotNil(t, testdata.Org1.ID, batch.Broadcast)
assert.Equal(t, []models.ContactID{testdata.Alexandria.ID, testdata.Bob.ID}, batch.ContactIDs)

oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
Expand All @@ -153,9 +150,6 @@ func TestBroadcastBatchCreateMessage(t *testing.T) {
oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshOptIns)
require.NoError(t, err)

// we need a broadcast id to insert messages but the content here is ignored
bcastID := testdata.InsertBroadcast(rt, testdata.Org1, "eng", map[i18n.Language]string{"eng": "Test"}, nil, models.NilScheduleID, nil, nil)

tcs := []struct {
contactLanguage i18n.Language
contactURN urns.URN
Expand Down Expand Up @@ -249,16 +243,23 @@ func TestBroadcastBatchCreateMessage(t *testing.T) {
contact := testdata.InsertContact(rt, testdata.Org1, flows.ContactUUID(uuids.NewV4()), "Felix", tc.contactLanguage, models.ContactStatusActive)
testdata.InsertContactURN(rt, testdata.Org1, contact, tc.contactURN, 1000, nil)

batch := &models.BroadcastBatch{
BroadcastID: bcastID,
bcast := &models.Broadcast{
OrgID: testdata.Org1.ID,
Translations: tc.translations,
BaseLanguage: tc.baseLanguage,
Expressions: tc.expressions,
OptInID: tc.optInID,
TemplateID: tc.templateID,
TemplateVariables: tc.templateVariables,
ContactIDs: []models.ContactID{contact.ID},
}
err = models.InsertBroadcast(ctx, rt.DB, bcast)
require.NoError(t, err)

batch := &models.BroadcastBatch{
BroadcastID: bcast.ID,
Broadcast: bcast,
ContactIDs: []models.ContactID{contact.ID},
IsLast: true,
}

msgs, err := batch.CreateMessages(ctx, rt, oa)
Expand Down
4 changes: 2 additions & 2 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ func NewOutgoingFlowMsg(rt *runtime.Runtime, org *Org, channel *Channel, session
}

// NewOutgoingBroadcastMsg creates an outgoing message which is part of a broadcast
func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, bb *BroadcastBatch) (*Msg, error) {
return newOutgoingTextMsg(rt, org, channel, contact, out, nil, nil, bb.BroadcastID, NilTicketID, bb.OptInID, bb.CreatedByID, dates.Now())
func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, b *Broadcast) (*Msg, error) {
return newOutgoingTextMsg(rt, org, channel, contact, out, nil, nil, b.ID, NilTicketID, b.OptInID, b.CreatedByID, dates.Now())
}

// NewOutgoingTicketMsg creates an outgoing message from a ticket
Expand Down
2 changes: 1 addition & 1 deletion core/msgio/courier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestNewCourierMsg(t *testing.T) {
// try a broadcast message which won't have session and flow fields set and won't be high priority
bcastID := testdata.InsertBroadcast(rt, testdata.Org1, `eng`, map[i18n.Language]string{`eng`: "Blast"}, nil, models.NilScheduleID, []*testdata.Contact{testFred}, nil)
bcastMsg1 := flows.NewMsgOut(fredURN, assets.NewChannelReference(testdata.TwilioChannel.UUID, "Test Channel"), &flows.MsgContent{Text: "Blast"}, nil, flows.NilMsgTopic, i18n.NilLocale, flows.NilUnsendableReason)
msg3, err := models.NewOutgoingBroadcastMsg(rt, oa.Org(), twilio, fred, bcastMsg1, &models.BroadcastBatch{BroadcastID: bcastID, OptInID: optInID, CreatedByID: testdata.Admin.ID})
msg3, err := models.NewOutgoingBroadcastMsg(rt, oa.Org(), twilio, fred, bcastMsg1, &models.Broadcast{ID: bcastID, OptInID: optInID, CreatedByID: testdata.Admin.ID})
require.NoError(t, err)

err = models.InsertMessages(ctx, rt.DB, []*models.Msg{msg3})
Expand Down
20 changes: 19 additions & 1 deletion core/tasks/msgs/send_broadcast_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ func (t *SendBroadcastBatchTask) WithAssets() models.Refresh {
}

func (t *SendBroadcastBatchTask) Perform(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets) error {
var bcast *models.Broadcast
var err error

// if this batch belongs to a persisted broadcast, fetch it
if t.BroadcastID != models.NilBroadcastID {
bcast, err = models.GetBroadcastByID(ctx, rt.DB, t.BroadcastID)
if err != nil {
return fmt.Errorf("error loading flow start for batch: %w", err)
}
} else {
bcast = t.Broadcast // otherwise use broadcast from the task
}

// if this broadcast was interrupted, we're done
if bcast.Status == models.BroadcastStatusInterrupted {
return nil
}

// create this batch of messages
msgs, err := t.BroadcastBatch.CreateMessages(ctx, rt, oa)
if err != nil {
Expand All @@ -46,7 +64,7 @@ func (t *SendBroadcastBatchTask) Perform(ctx context.Context, rt *runtime.Runtim

// if this is our last batch, mark broadcast as done
if t.IsLast {
if err := (&models.Broadcast{ID: t.BroadcastBatch.BroadcastID}).SetCompleted(ctx, rt.DB); err != nil {
if err := bcast.SetCompleted(ctx, rt.DB); err != nil {
return fmt.Errorf("error marking broadcast as complete: %w", err)
}
}
Expand Down

0 comments on commit fe7a0a2

Please sign in to comment.