From 915712ff8d25242cec6115feda5297d359149573 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 20 Sep 2024 15:02:29 -0500 Subject: [PATCH] Use broadcast field on batch tasks --- core/models/broadcasts.go | 22 ++++++++++++++++++---- core/models/broadcasts_test.go | 21 +++++++++++---------- core/models/msgs.go | 4 ++-- core/msgio/courier_test.go | 2 +- core/tasks/msgs/send_broadcast_batch.go | 20 +++++++++++++++++++- 5 files changed, 51 insertions(+), 18 deletions(-) diff --git a/core/models/broadcasts.go b/core/models/broadcasts.go index 7f41387dd..af0195489 100644 --- a/core/models/broadcasts.go +++ b/core/models/broadcasts.go @@ -258,6 +258,20 @@ RETURNING id` const sqlInsertBroadcastContacts = `INSERT INTO msgs_broadcast_contacts(broadcast_id, contact_id) VALUES(:broadcast_id, :contact_id)` const sqlInsertBroadcastGroups = `INSERT INTO msgs_broadcast_groups(broadcast_id, contactgroup_id) VALUES(:broadcast_id, :contactgroup_id)` +const sqlGetBroadcastByID = ` +SELECT id, org_id, status, translations, base_language, expressions, optin_id, template_id, template_variables, created_by_id + FROM msgs_broadcast + WHERE id = $1` + +// GetBroadcastByID gets a broadcast by it's ID - NOTE this does not load all attributes of the broadcast +func GetBroadcastByID(ctx context.Context, db DBorTx, bcastID BroadcastID) (*Broadcast, error) { + b := &Broadcast{} + if err := db.GetContext(ctx, b, sqlGetBroadcastByID, bcastID); err != nil { + return nil, fmt.Errorf("error loading broadcast #%d: %w", bcastID, err) + } + return b, nil +} + // BroadcastBatch represents a batch of contacts that need messages sent for type BroadcastBatch struct { // for persisted starts broadcast_id is set, for non-persisted broadcasts like flow actions, broadcast is set @@ -315,10 +329,10 @@ func (b *BroadcastBatch) createMessage(rt *runtime.Runtime, oa *OrgAssets, c *Co return nil, fmt.Errorf("error creating flow contact for broadcast message: %w", err) } - content, locale := b.Translations.ForContact(oa.Env(), contact, b.BaseLanguage) + content, locale := b.Broadcast.Translations.ForContact(oa.Env(), contact, b.Broadcast.BaseLanguage) var expressionsContext *types.XObject - if b.Expressions { + if b.Broadcast.Expressions { expressionsContext = types.NewXObject(map[string]types.XValue{ "contact": flows.Context(oa.Env(), contact), "fields": flows.Context(oa.Env(), contact.Fields()), @@ -333,9 +347,9 @@ func (b *BroadcastBatch) createMessage(rt *runtime.Runtime, oa *OrgAssets, c *Co } // create our outgoing message - out, ch := CreateMsgOut(rt, oa, contact, content, b.TemplateID, b.TemplateVariables, locale, expressionsContext) + out, ch := CreateMsgOut(rt, oa, contact, content, b.Broadcast.TemplateID, b.Broadcast.TemplateVariables, locale, expressionsContext) - msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), ch, contact, out, b) + msg, err := NewOutgoingBroadcastMsg(rt, oa.Org(), ch, contact, out, b.Broadcast) if err != nil { return nil, fmt.Errorf("error creating outgoing message: %w", err) } diff --git a/core/models/broadcasts_test.go b/core/models/broadcasts_test.go index 54bf9f89d..bb63ba185 100644 --- a/core/models/broadcasts_test.go +++ b/core/models/broadcasts_test.go @@ -126,10 +126,7 @@ func TestNonPersistentBroadcasts(t *testing.T) { batch := bcast.CreateBatch([]models.ContactID{testdata.Alexandria.ID, testdata.Bob.ID}, false) assert.Equal(t, models.NilBroadcastID, batch.BroadcastID) - assert.Equal(t, testdata.Org1.ID, batch.OrgID) - assert.Equal(t, i18n.Language("eng"), batch.BaseLanguage) - assert.Equal(t, translations, batch.Translations) - assert.Equal(t, optIn.ID, batch.OptInID) + assert.NotNil(t, testdata.Org1.ID, batch.Broadcast) assert.Equal(t, []models.ContactID{testdata.Alexandria.ID, testdata.Bob.ID}, batch.ContactIDs) oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID) @@ -153,9 +150,6 @@ func TestBroadcastBatchCreateMessage(t *testing.T) { oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshOptIns) require.NoError(t, err) - // we need a broadcast id to insert messages but the content here is ignored - bcastID := testdata.InsertBroadcast(rt, testdata.Org1, "eng", map[i18n.Language]string{"eng": "Test"}, nil, models.NilScheduleID, nil, nil) - tcs := []struct { contactLanguage i18n.Language contactURN urns.URN @@ -249,8 +243,7 @@ func TestBroadcastBatchCreateMessage(t *testing.T) { contact := testdata.InsertContact(rt, testdata.Org1, flows.ContactUUID(uuids.NewV4()), "Felix", tc.contactLanguage, models.ContactStatusActive) testdata.InsertContactURN(rt, testdata.Org1, contact, tc.contactURN, 1000, nil) - batch := &models.BroadcastBatch{ - BroadcastID: bcastID, + bcast := &models.Broadcast{ OrgID: testdata.Org1.ID, Translations: tc.translations, BaseLanguage: tc.baseLanguage, @@ -258,7 +251,15 @@ func TestBroadcastBatchCreateMessage(t *testing.T) { OptInID: tc.optInID, TemplateID: tc.templateID, TemplateVariables: tc.templateVariables, - ContactIDs: []models.ContactID{contact.ID}, + } + err = models.InsertBroadcast(ctx, rt.DB, bcast) + require.NoError(t, err) + + batch := &models.BroadcastBatch{ + BroadcastID: bcast.ID, + Broadcast: bcast, + ContactIDs: []models.ContactID{contact.ID}, + IsLast: true, } msgs, err := batch.CreateMessages(ctx, rt, oa) diff --git a/core/models/msgs.go b/core/models/msgs.go index effa82ce8..f1295b9a1 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -355,8 +355,8 @@ func NewOutgoingFlowMsg(rt *runtime.Runtime, org *Org, channel *Channel, session } // NewOutgoingBroadcastMsg creates an outgoing message which is part of a broadcast -func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, bb *BroadcastBatch) (*Msg, error) { - return newOutgoingTextMsg(rt, org, channel, contact, out, nil, nil, bb.BroadcastID, NilTicketID, bb.OptInID, bb.CreatedByID, dates.Now()) +func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, b *Broadcast) (*Msg, error) { + return newOutgoingTextMsg(rt, org, channel, contact, out, nil, nil, b.ID, NilTicketID, b.OptInID, b.CreatedByID, dates.Now()) } // NewOutgoingTicketMsg creates an outgoing message from a ticket diff --git a/core/msgio/courier_test.go b/core/msgio/courier_test.go index 6fe794458..16549343d 100644 --- a/core/msgio/courier_test.go +++ b/core/msgio/courier_test.go @@ -153,7 +153,7 @@ func TestNewCourierMsg(t *testing.T) { // try a broadcast message which won't have session and flow fields set and won't be high priority bcastID := testdata.InsertBroadcast(rt, testdata.Org1, `eng`, map[i18n.Language]string{`eng`: "Blast"}, nil, models.NilScheduleID, []*testdata.Contact{testFred}, nil) bcastMsg1 := flows.NewMsgOut(fredURN, assets.NewChannelReference(testdata.TwilioChannel.UUID, "Test Channel"), &flows.MsgContent{Text: "Blast"}, nil, flows.NilMsgTopic, i18n.NilLocale, flows.NilUnsendableReason) - msg3, err := models.NewOutgoingBroadcastMsg(rt, oa.Org(), twilio, fred, bcastMsg1, &models.BroadcastBatch{BroadcastID: bcastID, OptInID: optInID, CreatedByID: testdata.Admin.ID}) + msg3, err := models.NewOutgoingBroadcastMsg(rt, oa.Org(), twilio, fred, bcastMsg1, &models.Broadcast{ID: bcastID, OptInID: optInID, CreatedByID: testdata.Admin.ID}) require.NoError(t, err) err = models.InsertMessages(ctx, rt.DB, []*models.Msg{msg3}) diff --git a/core/tasks/msgs/send_broadcast_batch.go b/core/tasks/msgs/send_broadcast_batch.go index 068a9241c..23059f24e 100644 --- a/core/tasks/msgs/send_broadcast_batch.go +++ b/core/tasks/msgs/send_broadcast_batch.go @@ -36,6 +36,24 @@ func (t *SendBroadcastBatchTask) WithAssets() models.Refresh { } func (t *SendBroadcastBatchTask) Perform(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets) error { + var bcast *models.Broadcast + var err error + + // if this batch belongs to a persisted broadcast, fetch it + if t.BroadcastID != models.NilBroadcastID { + bcast, err = models.GetBroadcastByID(ctx, rt.DB, t.BroadcastID) + if err != nil { + return fmt.Errorf("error loading flow start for batch: %w", err) + } + } else { + bcast = t.Broadcast // otherwise use broadcast from the task + } + + // if this broadcast was interrupted, we're done + if bcast.Status == models.BroadcastStatusInterrupted { + return nil + } + // create this batch of messages msgs, err := t.BroadcastBatch.CreateMessages(ctx, rt, oa) if err != nil { @@ -46,7 +64,7 @@ func (t *SendBroadcastBatchTask) Perform(ctx context.Context, rt *runtime.Runtim // if this is our last batch, mark broadcast as done if t.IsLast { - if err := (&models.Broadcast{ID: t.BroadcastBatch.BroadcastID}).SetCompleted(ctx, rt.DB); err != nil { + if err := bcast.SetCompleted(ctx, rt.DB); err != nil { return fmt.Errorf("error marking broadcast as complete: %w", err) } }