Skip to content

Commit

Permalink
Add support for IVR session expires to new contact fire cron
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Jan 24, 2025
1 parent fc2a5e7 commit 3ab2385
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 40 deletions.
109 changes: 70 additions & 39 deletions core/tasks/contacts/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/ivr"
"github.com/nyaruka/mailroom/runtime"
)

Expand All @@ -34,7 +35,7 @@ func (c *FiresCron) AllInstances() bool {

func (c *FiresCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string]any, error) {
start := time.Now()
numExpires, numTimeouts, numCampaigns := 0, 0, 0
numExpires, numHangups, numTimeouts := 0, 0, 0

rc := rt.RP.Get()
defer rc.Close()
Expand All @@ -48,48 +49,78 @@ func (c *FiresCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string]an
break
}

type orgAndType struct {
orgID models.OrgID
typ models.ContactFireType
}
// organize fires by org and type
expirations := make(map[models.OrgID][]*models.ContactFire, 100)
hangups := make(map[models.OrgID][]*models.ContactFire, 100)
timeouts := make(map[models.OrgID][]*models.ContactFire, 100)

// organize tasks by org and type
byOrgAndType := make(map[orgAndType][]*models.ContactFire)
for _, f := range fires {
byOrgAndType[orgAndType{f.OrgID, f.Type}] = append(byOrgAndType[orgAndType{f.OrgID, f.Type}], f)
if f.Type == models.ContactFireTypeWaitExpiration {
if f.Extra.V.CallID == models.NilCallID {
expirations[f.OrgID] = append(expirations[f.OrgID], f)
} else {
hangups[f.OrgID] = append(hangups[f.OrgID], f)
}
} else if f.Type == models.ContactFireTypeWaitTimeout {
timeouts[f.OrgID] = append(timeouts[f.OrgID], f)
} else if f.Type == models.ContactFireTypeCampaign {
// TODO
}
}

// turn expires into bulk expire tasks
for orgID, orgExpires := range expirations {
for batch := range slices.Chunk(orgExpires, c.taskBatchSize) {
es := make([]*Expiration, len(batch))
for i, f := range batch {
es[i] = &Expiration{ContactID: f.ContactID, SessionID: f.Extra.V.SessionID, ModifiedOn: f.Extra.V.SessionModifiedOn}
}

// put expirations in throttled queue but high priority so they get priority over flow starts etc
if err := tasks.Queue(rc, tasks.ThrottledQueue, orgID, &BulkSessionExpireTask{Expirations: es}, true); err != nil {
return nil, fmt.Errorf("error queuing bulk fire task for org #%d: %w", orgID, err)
}
numExpires += len(batch)

if err := models.DeleteContactFires(ctx, rt, batch); err != nil {
return nil, fmt.Errorf("error deleting queued contact fires: %w", err)
}
}
}

for ot, fs := range byOrgAndType {
for batch := range slices.Chunk(fs, c.taskBatchSize) {
if ot.typ == models.ContactFireTypeWaitExpiration {
es := make([]*Expiration, len(batch))
for i, f := range batch {
es[i] = &Expiration{ContactID: f.ContactID, SessionID: f.Extra.V.SessionID, ModifiedOn: f.Extra.V.SessionModifiedOn}
}

// put expirations in throttled queue but high priority so they get priority over flow starts etc
if err := tasks.Queue(rc, tasks.ThrottledQueue, ot.orgID, &BulkSessionExpireTask{Expirations: es}, true); err != nil {
return nil, fmt.Errorf("error queuing bulk fire task for org #%d: %w", ot.orgID, err)
}
numExpires += len(batch)

} else if ot.typ == models.ContactFireTypeWaitTimeout {
ts := make([]*Timeout, len(batch))
for i, f := range batch {
ts[i] = &Timeout{ContactID: f.ContactID, SessionID: f.Extra.V.SessionID, ModifiedOn: f.Extra.V.SessionModifiedOn}
}

// put timeouts in throttled queue but high priority so they get priority over flow starts etc
if err := tasks.Queue(rc, tasks.ThrottledQueue, ot.orgID, &BulkSessionTimeoutTask{Timeouts: ts}, true); err != nil {
return nil, fmt.Errorf("error queuing bulk fire task for org #%d: %w", ot.orgID, err)
}
numTimeouts += len(batch)

} else if ot.typ == models.ContactFireTypeCampaign {
// TODO

numCampaigns += len(batch)
// turn voice expires into bulk hangup tasks
for orgID, orgHangups := range hangups {
for batch := range slices.Chunk(orgHangups, c.taskBatchSize) {
hs := make([]*ivr.Hangup, len(batch))
for i, f := range batch {
hs[i] = &ivr.Hangup{SessionID: f.Extra.V.SessionID, CallID: f.Extra.V.CallID}
}

// put hangups in batch queue but high priority so they get priority over imports etc
if err := tasks.Queue(rc, tasks.BatchQueue, orgID, &ivr.BulkCallHangupTask{Hangups: hs}, true); err != nil {
return nil, fmt.Errorf("error queuing bulk fire task for org #%d: %w", orgID, err)
}
numHangups += len(batch)

if err := models.DeleteContactFires(ctx, rt, batch); err != nil {
return nil, fmt.Errorf("error deleting queued contact fires: %w", err)
}
}
}

// turn timeouts into bulk timeout tasks
for orgID, orgTimeouts := range timeouts {
for batch := range slices.Chunk(orgTimeouts, c.taskBatchSize) {
ts := make([]*Timeout, len(batch))
for i, f := range batch {
ts[i] = &Timeout{ContactID: f.ContactID, SessionID: f.Extra.V.SessionID, ModifiedOn: f.Extra.V.SessionModifiedOn}
}

// put timeouts in throttled queue but high priority so they get priority over flow starts etc
if err := tasks.Queue(rc, tasks.ThrottledQueue, orgID, &BulkSessionTimeoutTask{Timeouts: ts}, true); err != nil {
return nil, fmt.Errorf("error queuing bulk fire task for org #%d: %w", orgID, err)
}
numTimeouts += len(batch)

if err := models.DeleteContactFires(ctx, rt, batch); err != nil {
return nil, fmt.Errorf("error deleting queued contact fires: %w", err)
Expand All @@ -103,5 +134,5 @@ func (c *FiresCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string]an
}
}

return map[string]any{"expires": numExpires, "timeouts": numTimeouts, "campaigns": numCampaigns}, nil
return map[string]any{"expires": numExpires, "hangups": numHangups, "timeouts": numTimeouts}, nil
}
20 changes: 19 additions & 1 deletion core/tasks/contacts/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/contacts"
"github.com/nyaruka/mailroom/core/tasks/ivr"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
Expand All @@ -29,13 +30,14 @@ func TestFiresCron(t *testing.T) {
testdata.InsertContactFire(rt, testdata.Org1, testdata.Cathy, models.ContactFireTypeWaitTimeout, "", map[string]any{"session_id": 1234, "session_modified_on": "2025-01-22T17:55:00Z"}, time.Now().Add(3*time.Second))
testdata.InsertContactFire(rt, testdata.Org1, testdata.Bob, models.ContactFireTypeWaitExpiration, "", map[string]any{"session_id": 2345, "session_modified_on": "2025-01-22T17:56:00Z"}, time.Now().Add(-3*time.Second))
testdata.InsertContactFire(rt, testdata.Org1, testdata.Bob, models.ContactFireTypeWaitTimeout, "", map[string]any{"session_id": 1234, "session_modified_on": "2025-01-22T17:55:00Z"}, time.Now().Add(3*time.Second))
testdata.InsertContactFire(rt, testdata.Org1, testdata.George, models.ContactFireTypeWaitExpiration, "", map[string]any{"session_id": 3456, "session_modified_on": "2025-01-22T17:57:00Z", "call_id": 23}, time.Now().Add(-time.Second))
testdata.InsertContactFire(rt, testdata.Org1, testdata.George, models.ContactFireTypeWaitTimeout, "", map[string]any{"session_id": 3456, "session_modified_on": "2025-01-22T17:57:00Z"}, time.Now().Add(-time.Second))
testdata.InsertContactFire(rt, testdata.Org2, testdata.Org2Contact, models.ContactFireTypeWaitTimeout, "", map[string]any{"session_id": 4567, "session_modified_on": "2025-01-22T17:58:00Z"}, time.Now().Add(-time.Second))

cron := contacts.NewFiresCron(3, 5)
res, err := cron.Run(ctx, rt)
assert.NoError(t, err)
assert.Equal(t, map[string]any{"expires": 2, "timeouts": 2, "campaigns": 0}, res)
assert.Equal(t, map[string]any{"expires": 2, "hangups": 1, "timeouts": 2}, res)

// should have created 3 throttled tasks.. unfortunately order is not guaranteed so we sort them
var ts []*queues.Task
Expand Down Expand Up @@ -68,5 +70,21 @@ func TestFiresCron(t *testing.T) {
assert.Len(t, decoded2.Timeouts, 1)
assert.Equal(t, models.SessionID(3456), decoded2.Timeouts[0].SessionID)

// the hangup task should have ended up in the batch queue
task, err := tasks.BatchQueue.Pop(rc)
assert.NoError(t, err)
assert.Equal(t, int(testdata.Org1.ID), task.OwnerID)
assert.Equal(t, "bulk_call_hangup", task.Type)

decoded3 := &ivr.BulkCallHangupTask{}
jsonx.MustUnmarshal(task.Task, decoded3)
assert.Len(t, decoded3.Hangups, 1)
assert.Equal(t, models.SessionID(3456), decoded3.Hangups[0].SessionID)
assert.Equal(t, models.CallID(23), decoded3.Hangups[0].CallID)

assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM contacts_contactfire`).Returns(2) // only 2 fires in the future left

res, err = cron.Run(ctx, rt)
assert.NoError(t, err)
assert.Equal(t, map[string]any{"expires": 0, "hangups": 0, "timeouts": 0}, res)
}

0 comments on commit 3ab2385

Please sign in to comment.