Skip to content

Commit

Permalink
Feature/2281 reduce bulk send (#464)
Browse files Browse the repository at this point in the history
* reduce evaluation of bulk sent events

* use new event map conversion

* make format
  • Loading branch information
mabiede authored Dec 18, 2024
1 parent 775345d commit afed656
Show file tree
Hide file tree
Showing 53 changed files with 131 additions and 155 deletions.
6 changes: 3 additions & 3 deletions pool/app/changelog/changelog.ml
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ module T (R : RecordSig) = struct

let make_write ?(id = Id.create ()) ?user_uuid ~entity_uuid (before : R.t) (after : R.t)
=
(*** the entity_uuid could also be part of the Reord module, as a function.
This would probably require us to create a Record module for each model,
but provide more flexibility *)
(*** the entity_uuid could also be part of the Reord module, as a function. This would
probably require us to create a Record module for each model, but provide more
flexibility *)
make_changes before after
|> CCOption.map (fun changes ->
Write.
Expand Down
3 changes: 1 addition & 2 deletions pool/app/custom_field/custom_field.ml
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ let validate_partial_update
custom_field |> custom |> check_version old_v |> Lwt_result.lift
;;

(* Replace all ids with the names of the custom_fields and
custom_field_options *)
(* Replace all ids with the names of the custom_fields and custom_field_options *)
let changelog_to_human pool language ({ Changelog.changes; _ } as changelog) =
let open Changelog.Changes in
let id_of_string = CCFun.(Id.validate %> CCOption.of_result) in
Expand Down
4 changes: 2 additions & 2 deletions pool/app/custom_field/version_history.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ module AnswerRecord = struct
let yojson_of_t record =
let field_uuid = "custom_field_uuid" in
let json = yojson_of_t record in
(* Wrap the record json in an assoc, where the uuid is the key, to make sure
the field name is visible in the changelog *)
(* Wrap the record json in an assoc, where the uuid is the key, to make sure the field
name is visible in the changelog *)
match json with
| `List [ `String _; `Assoc answer ] ->
CCList.assoc_opt ~eq:CCString.equal field_uuid answer
Expand Down
1 change: 1 addition & 0 deletions pool/app/email/email.mli
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,4 @@ val sent
-> event

val bulksent : dispatch list -> event
val bulksent_opt : dispatch list -> event list
3 changes: 3 additions & 0 deletions pool/app/email/event.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ let create_sent ?id ?message_template ?job_ctx ?new_email_address ?new_smtp_auth
|> sent ?new_email_address ?new_smtp_auth_id
;;

let bulksent_opt jobs = if CCList.is_empty jobs then [] else [ BulkSent jobs ]

let handle_event pool : event -> unit Lwt.t = function
| Sent ({ job; id; message_template; job_ctx }, new_email_address, new_smtp_auth_id) ->
Email_service.dispatch
Expand All @@ -87,6 +89,7 @@ let handle_event pool : event -> unit Lwt.t = function
?job_ctx
pool
job
| BulkSent [] -> Lwt.return_unit
| BulkSent jobs ->
let jobs =
CCList.map
Expand Down
8 changes: 4 additions & 4 deletions pool/app/filter/entity.ml
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ module Key = struct
match read_hardcoded yojson with
| Some h -> Ok (Hardcoded h)
| None ->
(* The "validate_query" function will check, if the id belongs to an
existing custom field *)
(* The "validate_query" function will check, if the id belongs to an existing custom
field *)
(match yojson with
| `String id -> Ok (CustomField (id |> Custom_field.Id.of_string))
| _ -> Error Pool_message.(Error.Invalid Field.Key))
Expand Down Expand Up @@ -359,8 +359,8 @@ module Operator = struct
;;

let to_sql = function
(* List operators are used to query custom field answers by their value
which store json arrays *)
(* List operators are used to query custom field answers by their value which store
json arrays *)
| ContainsSome | ContainsAll -> "LIKE"
| ContainsNone -> "NOT LIKE"
;;
Expand Down
8 changes: 3 additions & 5 deletions pool/app/filter/repo/repo_utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ let add_list_condition subquery dyn ids =
Error Message.(Error.Invalid Field.Operator)
;;

(* The subquery returns any contacts that has been an assignment to an
experiment. *)
(* The subquery returns any contacts that has been an assignment to an experiment. *)
let assignment_subquery dyn operator ids =
let open CCResult in
let* dyn, query_params = add_uuid_param dyn ids in
Expand Down Expand Up @@ -226,9 +225,8 @@ let invitation_subquery dyn operator ids =
add_list_condition subquery dyn ids operator
;;

(* The subquery does not return any contacts that have shown up at a session of
the current experiment. It does not make a difference, if they
participated. *)
(* The subquery does not return any contacts that have shown up at a session of the
current experiment. It does not make a difference, if they participated. *)
let participation_subquery dyn operator ids =
let open CCResult in
let* dyn, query_params = add_uuid_param dyn ids in
Expand Down
5 changes: 2 additions & 3 deletions pool/app/guard/repo.ml
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,8 @@ end
let src = Logs.Src.create "guard"
module Cache = struct
(* TODO: Once the guardian package has a cached version, this implementation
can be updated/removed (Issue:
https://github.com/uzh/guardian/issues/11) *)
(* TODO: Once the guardian package has a cached version, this implementation can be
updated/removed (Issue: https://github.com/uzh/guardian/issues/11) *)
open CCCache
let equal_find_actor (l1, a1) (l2, a2) =
Expand Down
7 changes: 3 additions & 4 deletions pool/app/message_template/repo/repo_sql.ml
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ let find_by_label_and_language_to_send pool ?entity_uuids label language =
Database.find pool request pv
;;

(* The template are prioritised according to the entity_uuids list, from left to
right. If none are found, the default template will be returned. *)
(* The template are prioritised according to the entity_uuids list, from left to right. If
none are found, the default template will be returned. *)
let find_all_by_label_to_send pool ?entity_uuids languages label =
let open Utils.Lwt_result.Infix in
if CCList.is_empty languages
Expand All @@ -233,8 +233,7 @@ let find_all_by_label_to_send pool ?entity_uuids languages label =
;;

let find_entity_defaults_by_label pool ?entity_uuids languages label =
(* Removing the last uuid from the entity_uuids to so the entity default is
returned *)
(* Removing the last uuid from the entity_uuids to so the entity default is returned *)
let entity_uuids =
match entity_uuids with
| None | Some [] | Some (_ :: []) -> []
Expand Down
4 changes: 2 additions & 2 deletions pool/app/pool_context/entity.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
open Sexplib.Conv

(* TODO: Service.User.t for Admin and Root are placeholders and should be
replaced, when guadrian is implemented *)
(* TODO: Service.User.t for Admin and Root are placeholders and should be replaced, when
guadrian is implemented *)
type user =
| Admin of Admin.t
| Contact of Contact.t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ let make_value_nullable =
|sql}
;;

(* To keep the migrations clean, I added a default value (Versions only matter
when mutliple people are updating) *)
(* To keep the migrations clean, I added a default value (Versions only matter when
mutliple people are updating) *)
let add_versions_and_admin_values =
Database.Migration.Step.create
~label:"add versions and admin values"
Expand Down
5 changes: 3 additions & 2 deletions pool/app/reminder/service.ml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ let create_email_events data =
, reminders @ emails ))
([], [])
|> fun (session_events, emails) ->
(Email.BulkSent emails |> Pool_event.email) :: session_events
(Email.bulksent_opt emails |> Pool_event.(map email)) @ session_events
;;

let create_text_message_events data =
Expand All @@ -56,7 +56,8 @@ let create_text_message_events data =
, reminders @ text_messages ))
([], [])
|> fun (session_events, text_messages) ->
(Text_message.BulkSent text_messages |> Pool_event.text_message) :: session_events
(Text_message.bulksent_opt text_messages |> Pool_event.(map text_message))
@ session_events
;;

let create_reminder_events
Expand Down
4 changes: 2 additions & 2 deletions pool/app/system_event/repo/repo.ml
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ module Sql = struct

let insert_request =
let open Caqti_request.Infix in
(* TODO: Consider using UPDATE ON DUPLICATE when event_id/service_id pair
should be unique *)
(* TODO: Consider using UPDATE ON DUPLICATE when event_id/service_id pair should be
unique *)
{sql|
INSERT INTO pool_system_event_logs (
event_uuid,
Expand Down
2 changes: 2 additions & 0 deletions pool/app/text_message/event.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type event =
| ReportCreated of delivery_report
[@@deriving eq, show, variants]

let bulksent_opt jobs = if CCList.is_empty jobs then [] else [ BulkSent jobs ]
let sent ?new_recipient job = Sent (job, new_recipient)

let create_sent ?id ?message_template ?job_ctx ?new_recipient job =
Expand All @@ -26,6 +27,7 @@ let create_sent ?id ?message_template ?job_ctx ?new_recipient job =
let handle_event pool : event -> unit Lwt.t = function
| Sent ({ job; id; message_template; job_ctx }, new_recipient) ->
Text_message_service.dispatch ?id ?new_recipient ?message_template ?job_ctx pool job
| BulkSent [] -> Lwt.return_unit
| BulkSent jobs ->
Lwt_list.iter_s
(fun { job; id; message_template; job_ctx } ->
Expand Down
2 changes: 2 additions & 0 deletions pool/app/text_message/text_message.mli
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,5 @@ val create_sent
-> event

val sent : ?new_recipient:Pool_user.CellPhone.t -> job -> event
val bulksent : job list -> event
val bulksent_opt : job list -> event list
6 changes: 3 additions & 3 deletions pool/app/utils/lwt_trace.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(* This module exists to add useful monadic infix operators and to ensure all
monadic actions are implemented using the lwt ppx to always present clean
stack/back traces (https://ocsigen.org/lwt/latest/manual/manual and
(* This module exists to add useful monadic infix operators and to ensure all monadic
actions are implemented using the lwt ppx to always present clean stack/back traces
(https://ocsigen.org/lwt/latest/manual/manual and
https://github.com/ocsigen/lwt/blob/6ce3d557798d2b5736fb458b697cc3eaa13c461e/src/core/lwt.ml#L1694) *)

module Infix = struct
Expand Down
3 changes: 1 addition & 2 deletions pool/app/utils/utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ end
module Html = struct
open Tyxml.Html

(* placed here due to circular dependency between email and http_utils
library *)
(* placed here due to circular dependency between email and http_utils library *)
let handle_line_breaks finally_fcn str =
finally_fcn
@@
Expand Down
4 changes: 2 additions & 2 deletions pool/cqrs_command/admin_command.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ end = struct
let* () =
Pool_user.EmailAddress.validate allowed_email_suffixes command.User_command.email
in
(* TODO: pass Id or Tenant to Admin.Created function as option to further
pass down to permissions *)
(* TODO: pass Id or Tenant to Admin.Created function as option to further pass down to
permissions *)
let admin : Admin.create =
User_command.
{ id
Expand Down
7 changes: 3 additions & 4 deletions pool/cqrs_command/assignment_command.ml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ let assignment_creation_and_confirmation_events
let email_event = Email.sent confirmation_email |> Pool_event.email in
let create_events =
Created (main_assignment, session.Session.id) :: follow_up_events
|> CCList.map Pool_event.assignment
|> Pool_event.(map assignment)
in
Ok (email_event :: create_events)
;;
Expand Down Expand Up @@ -575,9 +575,8 @@ end = struct

let handle ?(tags = Logs.Tag.empty) (assignment_events, emails) =
Logs.info ~src (fun m -> m "Handle command UpdateMatchesFilter" ~tags);
Ok
((assignment_events |> CCList.map Pool_event.assignment)
@ [ Email.BulkSent emails |> Pool_event.email ])
let email_events = Email.bulksent_opt emails |> Pool_event.(map email) in
Ok (Pool_event.(map assignment) assignment_events @ email_events)
;;

let effects id = Session.Guard.Access.update id
Expand Down
5 changes: 2 additions & 3 deletions pool/cqrs_command/contact_command.ml
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,8 @@ end = struct
let handle ?(tags = Logs.Tag.empty) ({ contacts; emails } : t) =
Logs.info ~src (fun m -> m "Handle command SendProfileUpdateTrigger" ~tags);
Ok
[ Contact.ProfileUpdateTriggeredAtUpdated contacts |> Pool_event.contact
; Email.BulkSent emails |> Pool_event.email
]
((Contact.ProfileUpdateTriggeredAtUpdated contacts |> Pool_event.contact)
:: (Email.bulksent_opt emails |> Pool_event.(map email)))
;;

let effects = Contact.Guard.Access.update
Expand Down
2 changes: 1 addition & 1 deletion pool/cqrs_command/custom_field_settings_command.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ end = struct
| Some (_ : string) | None -> `Drop)
fields
in
active @ inactive |> CCList.map Pool_event.custom_field |> CCResult.return
active @ inactive |> Pool_event.(map custom_field) |> CCResult.return
;;

let effects = Custom_field.Guard.Access.create
Expand Down
12 changes: 6 additions & 6 deletions pool/cqrs_command/experiment_command.ml
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,14 @@ end = struct
; matcher_notification_sent = MatcherNotificationSent.create false
}
in
let assignment_events = assignment_events |> CCList.map Pool_event.assignment in
let email_event = Email.BulkSent emails |> Pool_event.email in
let assignment_events = assignment_events |> Pool_event.(map assignment) in
let email_event = Email.bulksent_opt emails |> Pool_event.(map email) in
Ok
([ Filter.Created filter |> Pool_event.filter
; Experiment.Updated (experiment, updated) |> Pool_event.experiment
]
@ assignment_events
@ [ email_event ])
@ email_event)
;;

let effects id =
Expand Down Expand Up @@ -599,16 +599,16 @@ end = struct
=
Logs.info ~src (fun m -> m "Handle command UpdateFilter" ~tags);
let open CCResult in
let assignment_events = assignment_events |> CCList.map Pool_event.assignment in
let email_event = Email.BulkSent emails |> Pool_event.email in
let assignment_events = assignment_events |> Pool_event.(map assignment) in
let email_event = Email.bulksent_opt emails |> Pool_event.(map email) in
let updated_experiiment =
{ experiment with matcher_notification_sent = MatcherNotificationSent.create false }
in
Ok
([ Experiment.Updated (experiment, updated_experiiment) |> Pool_event.experiment
; Filter.Updated (filter, updated_fitler) |> Pool_event.filter
; email_event
]
@ email_event
@ assignment_events)
;;

Expand Down
2 changes: 1 addition & 1 deletion pool/cqrs_command/guardian_command.ml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ end = struct
| true -> if value then `Drop else `Right role_permission)
in
Guard.RolePermissionSaved create :: (destroy |> CCList.map Guard.rolepermissiondeleted)
|> CCList.map Pool_event.guard
|> Pool_event.(map guard)
|> CCResult.return
;;

Expand Down
2 changes: 1 addition & 1 deletion pool/cqrs_command/invitation_command.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ end = struct
Ok
([ Invitation.(Created { contacts; mailing; experiment })
|> Pool_event.invitation
; Email.BulkSent emails |> Pool_event.email
]
@ (Email.bulksent_opt emails |> Pool_event.(map email))
@ contact_update_on_invitation_sent contacts)
| Error err -> Error err)
;;
Expand Down
17 changes: 6 additions & 11 deletions pool/cqrs_command/session_command.ml
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,7 @@ end = struct
in
Ok
((Session.Rescheduled (session, { Session.start; duration }) |> Pool_event.session)
::
(if emails |> CCList.is_empty |> not
then [ Email.BulkSent emails |> Pool_event.email ]
else []))
:: (Email.bulksent_opt emails |> Pool_event.(map email)))
;;

let decode data =
Expand Down Expand Up @@ -912,10 +909,9 @@ end = struct
(Ok ([], []))
in
Ok
[ Email.BulkSent emails |> Pool_event.email
; Text_message.BulkSent text_messages |> Pool_event.text_message
; Session.TextMsgReminderSent session |> Pool_event.session
]
((Email.bulksent_opt emails |> Pool_event.(map email))
@ (Text_message.bulksent_opt text_messages |> Pool_event.(map text_message))
@ [ Session.TextMsgReminderSent session |> Pool_event.session ])
in
Ok events
;;
Expand Down Expand Up @@ -1009,9 +1005,8 @@ end = struct
(make_email_job language email_text email_subject plain_text assignment)
| None, false -> `Drop)
in
[ Text_message.BulkSent sms_jobs |> Pool_event.text_message
; Email.BulkSent email_jobs |> Pool_event.email
]
(Text_message.bulksent_opt sms_jobs |> Pool_event.(map text_message))
@ (Email.bulksent_opt email_jobs |> Pool_event.(map email))
;;

let email_command language email_subject email_text plain_text =
Expand Down
3 changes: 1 addition & 2 deletions pool/cqrs_command/user_command.ml
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ end = struct

let handle ?(tags = Logs.Tag.empty) ?notification user_id command =
Logs.info ~src (fun m -> m "Handle command UpdatePassword" ~tags);
(* NOTE use 'Pool_user.validate_current_password' in handler before this
command. *)
(* NOTE use 'Pool_user.validate_current_password' in handler before this command. *)
let open CCResult in
let* () =
Pool_user.Password.validate_confirmation
Expand Down
4 changes: 2 additions & 2 deletions pool/database/migration/migration.ml
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ let execute_migration database_label migration =
else
Lwt.return (state, Migration_repo.Migration.(steps_to_apply steps (version state)))
| None ->
(* If currently no state exists, an new one will be created. If there are
steps to execute, it is dirty *)
(* If currently no state exists, an new one will be created. If there are steps to
execute, it is dirty *)
Logs.debug (fun m -> m ~tags "Setting up table for %s" namespace);
let dirty = CCList.is_empty steps |> not in
let state = Migration_repo.Migration.create ~namespace ~dirty in
Expand Down
Loading

0 comments on commit afed656

Please sign in to comment.