Skip to content

Commit

Permalink
Merge pull request #1302 from nyaruka/expires_changes
Browse files Browse the repository at this point in the history
Wait expiration changes
  • Loading branch information
rowanseymour authored Jan 24, 2025
2 parents 1bba8cc + f7aee86 commit ca47382
Show file tree
Hide file tree
Showing 24 changed files with 552 additions and 476 deletions.
6 changes: 3 additions & 3 deletions cmd/flowrunner/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ func TestPrintEvent(t *testing.T) {
{events.NewContactRefreshed(session.Contact()), `👤 contact refreshed on resume`},
{events.NewContactTimezoneChanged(session.Environment().Timezone()), `🕑 timezone changed to 'America/Guayaquil'`},
{events.NewDialEnded(flows.NewDial(flows.DialStatusBusy, 3)), `☎️ dial ended with 'busy'`},
{events.NewDialWait(urns.URN(`tel:+1234567890`), 20, 120, nil), `⏳ waiting for dial (type /dial <answered|no_answer|busy|failed>)...`},
{events.NewDialWait(urns.URN(`tel:+1234567890`), 20, 120, expiresOn), `⏳ waiting for dial (type /dial <answered|no_answer|busy|failed>)...`},
{events.NewEmailSent([]string{"[email protected]"}, "Hi", "What up?"), `✉️ email sent with subject 'Hi'`},
{events.NewEnvironmentRefreshed(session.Environment()), `⚙️ environment refreshed on resume`},
{events.NewErrorf("this didn't work"), `⚠️ this didn't work`},
{events.NewFailure(errors.New("this really didn't work")), `🛑 this really didn't work`},
{events.NewFlowEntered(flow.Reference(false), "", false), `↪️ entered flow 'Registration'`},
{events.NewInputLabelsAdded("2a786bbc-2314-4d57-a0c9-b66e1642e5e2", []*flows.Label{sa.Labels().FindByName("Spam")}), `🏷️ labeled with 'Spam'`},
{events.NewMsgWait(nil, nil, nil), `⏳ waiting for message...`},
{events.NewMsgWait(&timeout, &expiresOn, nil), `⏳ waiting for message (3 sec timeout, type /timeout to simulate)...`},
{events.NewMsgWait(nil, expiresOn, nil), `⏳ waiting for message...`},
{events.NewMsgWait(&timeout, expiresOn, nil), `⏳ waiting for message (3 sec timeout, type /timeout to simulate)...`},
}

for _, tc := range tests {
Expand Down
14 changes: 7 additions & 7 deletions flows/actions/testdata/TestResthookPayload_resthook_payload.snap
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@
"uuid": "970b8069-50f5-4f6f-8f41-6b2d9f33d623"
},
{
"arrived_on": "2018-07-06T12:30:23.123456Z",
"arrived_on": "2018-07-06T12:30:24.123456Z",
"exit_uuid": "d898f9a4-f0fc-4ac4-a639-c98c602bb511",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
"uuid": "5ecda5fc-951c-437b-a17e-f85e49829fb9"
},
{
"arrived_on": "2018-07-06T12:30:51.123456Z",
"arrived_on": "2018-07-06T12:30:52.123456Z",
"exit_uuid": "9fc5f8b4-2247-43db-b899-ab1ac50ba06c",
"node_uuid": "c0781400-737f-4940-9a6c-1ec1c3df0325",
"uuid": "312d3af0-a565-4c96-ba00-bd7f0d08e671"
Expand All @@ -71,7 +71,7 @@
"2factor": {
"category": "",
"category_localized": "",
"created_on": "2018-07-06T12:30:32.123456Z",
"created_on": "2018-07-06T12:30:33.123456Z",
"input": "",
"name": "2Factor",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
Expand All @@ -80,7 +80,7 @@
"favorite_color": {
"category": "Red",
"category_localized": "Red",
"created_on": "2018-07-06T12:30:28.123456Z",
"created_on": "2018-07-06T12:30:29.123456Z",
"input": "",
"name": "Favorite Color",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
Expand All @@ -89,7 +89,7 @@
"intent": {
"category": "Success",
"category_localized": "Success",
"created_on": "2018-07-06T12:30:46.123456Z",
"created_on": "2018-07-06T12:30:47.123456Z",
"input": "Hi there",
"name": "Intent",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
Expand All @@ -98,7 +98,7 @@
"phone_number": {
"category": "",
"category_localized": "",
"created_on": "2018-07-06T12:30:24.123456Z",
"created_on": "2018-07-06T12:30:25.123456Z",
"input": "",
"name": "Phone Number",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
Expand All @@ -107,7 +107,7 @@
"webhook": {
"category": "Success",
"category_localized": "Success",
"created_on": "2018-07-06T12:30:40.123456Z",
"created_on": "2018-07-06T12:30:41.123456Z",
"input": "GET http://127.0.0.1:49999/?content=%7B%22results%22%3A%5B%7B%22state%22%3A%22WA%22%7D%2C%7B%22state%22%3A%22IN%22%7D%5D%7D",
"name": "webhook",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
Expand Down
61 changes: 36 additions & 25 deletions flows/definition/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"maps"
"slices"
"time"

"github.com/Masterminds/semver"
"github.com/nyaruka/gocommon/i18n"
Expand All @@ -31,15 +32,15 @@ func IsVersionSupported(v *semver.Version) bool {

type flow struct {
// spec properties
uuid assets.FlowUUID
name string
specVersion *semver.Version
language i18n.Language
flowType flows.FlowType
revision int
expireAfterMinutes int
localization flows.Localization
nodes []flows.Node
uuid assets.FlowUUID
name string
specVersion *semver.Version
language i18n.Language
flowType flows.FlowType
revision int
expireAfter time.Duration
localization flows.Localization
nodes []flows.Node

// optional properties not used by engine itself
ui json.RawMessage
Expand All @@ -52,20 +53,20 @@ type flow struct {
}

// NewFlow creates a new flow
func NewFlow(uuid assets.FlowUUID, name string, language i18n.Language, flowType flows.FlowType, revision int, expireAfterMinutes int, localization flows.Localization, nodes []flows.Node, ui json.RawMessage, a assets.Flow) (flows.Flow, error) {
func NewFlow(uuid assets.FlowUUID, name string, language i18n.Language, flowType flows.FlowType, revision int, expireAfter time.Duration, localization flows.Localization, nodes []flows.Node, ui json.RawMessage, a assets.Flow) (flows.Flow, error) {
f := &flow{
uuid: uuid,
name: name,
specVersion: CurrentSpecVersion,
language: language,
flowType: flowType,
revision: revision,
expireAfterMinutes: expireAfterMinutes,
localization: localization,
nodes: nodes,
nodeMap: make(map[flows.NodeUUID]flows.Node, len(nodes)),
ui: ui,
asset: a,
uuid: uuid,
name: name,
specVersion: CurrentSpecVersion,
language: language,
flowType: flowType,
revision: revision,
expireAfter: expireAfter,
localization: localization,
nodes: nodes,
nodeMap: make(map[flows.NodeUUID]flows.Node, len(nodes)),
ui: ui,
asset: a,
}

for _, node := range f.nodes {
Expand All @@ -85,12 +86,22 @@ func (f *flow) SpecVersion() *semver.Version { return f.specVersion }
func (f *flow) Revision() int { return f.revision }
func (f *flow) Language() i18n.Language { return f.language }
func (f *flow) Type() flows.FlowType { return f.flowType }
func (f *flow) ExpireAfterMinutes() int { return f.expireAfterMinutes }
func (f *flow) Nodes() []flows.Node { return f.nodes }
func (f *flow) Localization() flows.Localization { return f.localization }
func (f *flow) UI() json.RawMessage { return f.ui }
func (f *flow) GetNode(uuid flows.NodeUUID) flows.Node { return f.nodeMap[uuid] }

func (f *flow) ExpireAfter() time.Duration {
if f.expireAfter == 0 {
if f.flowType == flows.FlowTypeMessaging {
return 10080 * time.Minute
} else if f.flowType == flows.FlowTypeVoice {
return 5 * time.Minute
}
}
return f.expireAfter
}

func (f *flow) validate() error {
// track UUIDs used by nodes and actions to ensure that they are unique
seenUUIDs := make(map[uuids.UUID]bool)
Expand Down Expand Up @@ -348,7 +359,7 @@ func readFlow(data json.RawMessage, mc *migrations.Config, a assets.Flow) (flows
e.Localization = make(localization)
}

return NewFlow(e.UUID, e.Name, e.Language, e.Type, e.Revision, e.ExpireAfterMinutes, e.Localization, nodes, e.UI, a)
return NewFlow(e.UUID, e.Name, e.Language, e.Type, e.Revision, time.Duration(e.ExpireAfterMinutes)*time.Minute, e.Localization, nodes, e.UI, a)
}

// MarshalJSON marshals this flow into JSON
Expand All @@ -362,7 +373,7 @@ func (f *flow) MarshalJSON() ([]byte, error) {
Language: f.language,
Type: f.flowType,
Revision: f.revision,
ExpireAfterMinutes: f.expireAfterMinutes,
ExpireAfterMinutes: int(f.expireAfter / time.Minute),
Localization: f.localization.(localization),
Nodes: make([]*node, len(f.nodes)),
UI: f.ui,
Expand Down
5 changes: 3 additions & 2 deletions flows/definition/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/Masterminds/semver"
"github.com/nyaruka/gocommon/i18n"
Expand Down Expand Up @@ -219,8 +220,8 @@ func TestNewFlow(t *testing.T) {
"Test Flow", // name
i18n.Language("eng"), // base language
flows.FlowTypeMessaging,
123, // revision
30, // expires after minutes
123, // revision
30*time.Minute, // expires after minutes
definition.NewLocalization(),
[]flows.Node{
definition.NewNode(
Expand Down
5 changes: 2 additions & 3 deletions flows/events/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func TestEventMarshaling(t *testing.T) {

tz, _ := time.LoadLocation("Africa/Kigali")
timeout := 500
expiresOn := time.Date(2022, 2, 3, 13, 45, 30, 0, time.UTC)
gender := session.Assets().Fields().Get("gender")
jotd := session.Assets().OptIns().Get("248be71d-78e9-4d71-a6c4-9981d369e5cb")
weather := session.Assets().Topics().Get("472a7a73-96cb-4736-b567-056d987cc5b4")
Expand Down Expand Up @@ -504,7 +503,7 @@ func TestEventMarshaling(t *testing.T) {
}`,
},
{
events.NewMsgWait(&timeout, &expiresOn, hints.NewImageHint()),
events.NewMsgWait(&timeout, time.Date(2022, 2, 3, 13, 45, 30, 0, time.UTC), hints.NewImageHint()),
`{
"type": "msg_wait",
"created_on": "2018-10-18T14:20:30.000123456Z",
Expand Down Expand Up @@ -532,7 +531,7 @@ func TestEventMarshaling(t *testing.T) {
}`,
},
{
events.NewDialWait(urns.URN("tel:+1234567890"), 20, 120, &expiresOn),
events.NewDialWait(urns.URN("tel:+1234567890"), 20, 120, time.Date(2022, 2, 3, 13, 45, 30, 0, time.UTC)),
`{
"type": "dial_wait",
"created_on": "2018-10-18T14:20:30.000123456Z",
Expand Down
4 changes: 2 additions & 2 deletions flows/events/dial_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ type DialWaitEvent struct {
CallLimitSeconds int `json:"call_limit_seconds"`

// when this wait expires and the whole run can be expired
ExpiresOn *time.Time `json:"expires_on,omitempty"`
ExpiresOn time.Time `json:"expires_on,omitempty"`
}

// NewDialWait returns a new dial wait with the passed in URN
func NewDialWait(urn urns.URN, dialLimitSeconds, callLimitSeconds int, expiresOn *time.Time) *DialWaitEvent {
func NewDialWait(urn urns.URN, dialLimitSeconds, callLimitSeconds int, expiresOn time.Time) *DialWaitEvent {
return &DialWaitEvent{
BaseEvent: NewBaseEvent(TypeDialWait),
URN: urn,
Expand Down
6 changes: 3 additions & 3 deletions flows/events/msg_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ type MsgWaitEvent struct {
TimeoutSeconds *int `json:"timeout_seconds,omitempty"`

// When this wait expires and the whole run can be expired
ExpiresOn *time.Time `json:"expires_on,omitempty"`
ExpiresOn time.Time `json:"expires_on,omitempty"`

Hint flows.Hint `json:"hint,omitempty"`
}

// NewMsgWait returns a new msg wait with the passed in timeout
func NewMsgWait(timeoutSeconds *int, expiresOn *time.Time, hint flows.Hint) *MsgWaitEvent {
func NewMsgWait(timeoutSeconds *int, expiresOn time.Time, hint flows.Hint) *MsgWaitEvent {
return &MsgWaitEvent{
BaseEvent: NewBaseEvent(TypeMsgWait),
TimeoutSeconds: timeoutSeconds,
Expand All @@ -63,7 +63,7 @@ type msgWaitEnvelope struct {
BaseEvent

TimeoutSeconds *int `json:"timeout_seconds,omitempty"`
ExpiresOn *time.Time `json:"expires_on,omitempty"`
ExpiresOn time.Time `json:"expires_on,omitempty"`
Hint json.RawMessage `json:"hint,omitempty"`
}

Expand Down
2 changes: 1 addition & 1 deletion flows/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type Flow interface {
Revision() int
Language() i18n.Language
Type() FlowType
ExpireAfterMinutes() int
ExpireAfter() time.Duration
Localization() Localization
UI() json.RawMessage
Nodes() []Node
Expand Down
9 changes: 2 additions & 7 deletions flows/routers/waits/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,8 @@ func (w *baseWait) Timeout() flows.Timeout {
return w.timeout
}

func (w *baseWait) expiresOn(run flows.Run) *time.Time {
expiresAfterMins := run.Flow().ExpireAfterMinutes()
if expiresAfterMins > 0 {
dt := dates.Now().Add(time.Duration(int64(expiresAfterMins) * int64(time.Minute)))
return &dt
}
return nil
func (w *baseWait) expiresOn(run flows.Run) time.Time {
return dates.Now().Add(run.Flow().ExpireAfter())
}

//------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion flows/routers/waits/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (w *DialWait) Begin(run flows.Run, log flows.EventCallback) bool {
// flow so calculate an expiry guaranteed to be after the wait returns
expiresOn := dates.Now().Add(w.dialLimit + w.callLimit + time.Second*30)

log(events.NewDialWait(urn, int(w.dialLimit/time.Second), int(w.callLimit/time.Second), &expiresOn))
log(events.NewDialWait(urn, int(w.dialLimit/time.Second), int(w.callLimit/time.Second), expiresOn))

return true
}
Expand Down
Loading

0 comments on commit ca47382

Please sign in to comment.