Skip to content

Commit

Permalink
[pause_proc_timer][2/n] Add pause_proc_timer option to suspend_process/2
Browse files Browse the repository at this point in the history
We add a new `pause_proc_timer` option to the `erlang:suspend_process/2`
BIF. When given, the process is not only suspended, but its proc timer,
if set, will be paused. This means that if the paused process is waiting
on a `receive`, it will not timeout even if suspended for long.

Each time pause_proc_timer is given, a counter is bumped in the
suspend-process monitor. In order to decrease it, the (new) BIF
`erlang:resume_process/2` needs to be called with the option
`resume_proc_timer`. When the count reaches zero, the timer is
resumed (even though the process may still be suspended).

We add testcases for this functionality
  • Loading branch information
jcpetruzza committed Jan 21, 2025
1 parent c0b0255 commit 640ea44
Show file tree
Hide file tree
Showing 11 changed files with 435 additions and 109 deletions.
2 changes: 2 additions & 0 deletions erts/emulator/beam/atom.names
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ atom parent
atom Plus='+'
atom PlusPlus='++'
atom pause
atom pause_proc_timer
atom pending
atom pending_driver
atom pending_process
Expand Down Expand Up @@ -620,6 +621,7 @@ atom reset
atom reset_seq_trace
atom restart
atom resume
atom resume_proc_timer
atom return_from
atom return_to
atom return_to_trace
Expand Down
2 changes: 1 addition & 1 deletion erts/emulator/beam/bif.tab
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ bif erlang:seq_trace_info/1
bif erlang:seq_trace_print/1
bif erlang:seq_trace_print/2
bif erts_internal:suspend_process/2
bif erlang:resume_process/1
bif erlang:resume_process/2
bif erts_internal:process_display/2

bif erlang:bump_reductions/1
Expand Down
1 change: 1 addition & 0 deletions erts/emulator/beam/erl_monitor_link.c
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,7 @@ erts_monitor_create(Uint16 type, Eterm ref, Eterm orgn, Eterm trgt, Eterm name,

msp->next = NULL;
erts_atomic_init_relb(&msp->state, 0);
msp->ptimer_count = 0;
erts_atomic32_init_nob(&mdp->refc, 2);
break;
}
Expand Down
1 change: 1 addition & 0 deletions erts/emulator/beam/erl_monitor_link.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ struct ErtsMonitorSuspend__ {
ErtsMonitorData md; /* origin = suspender; target = suspendee */
ErtsMonitorSuspend *next;
erts_atomic_t state;
int ptimer_count;
};
#define ERTS_MSUSPEND_STATE_FLG_ACTIVE ((erts_aint_t) (((Uint) 1) << (sizeof(Uint)*8 - 1)))
#define ERTS_MSUSPEND_STATE_COUNTER_MASK (~ERTS_MSUSPEND_STATE_FLG_ACTIVE)
Expand Down
28 changes: 14 additions & 14 deletions erts/emulator/beam/erl_proc_sig_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -4992,6 +4992,17 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing,
return ((int) reds)*4 + 8;
}

static void
activate_suspend_monitor(Process *c_p, ErtsMonitorSuspend *msp)
{
erts_aint_t mstate;

mstate = erts_atomic_read_bor_acqb(&msp->state,
ERTS_MSUSPEND_STATE_FLG_ACTIVE);
ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
}

static void
handle_suspend(Process *c_p, ErtsMonitor *mon, int *yieldp)
{
Expand All @@ -5000,14 +5011,8 @@ handle_suspend(Process *c_p, ErtsMonitor *mon, int *yieldp)
ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND);

if (!(state & ERTS_PSFLG_DIRTY_RUNNING)) {
ErtsMonitorSuspend *msp;
erts_aint_t mstate;

msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
mstate = erts_atomic_read_bor_acqb(&msp->state,
ERTS_MSUSPEND_STATE_FLG_ACTIVE);
ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
ErtsMonitorSuspend *msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
activate_suspend_monitor(c_p, msp);
*yieldp = !0;
}
else {
Expand Down Expand Up @@ -5213,12 +5218,7 @@ erts_proc_sig_handle_pending_suspend(Process *c_p)
msp->next = NULL;
if (!(state & ERTS_PSFLG_EXITING)
&& erts_monitor_is_in_table(&msp->md.u.target)) {
erts_aint_t mstate;

mstate = erts_atomic_read_bor_acqb(&msp->state,
ERTS_MSUSPEND_STATE_FLG_ACTIVE);
ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
activate_suspend_monitor(c_p, msp);
}

erts_monitor_release(&msp->md.u.target);
Expand Down
104 changes: 94 additions & 10 deletions erts/emulator/beam/erl_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -8896,6 +8896,11 @@ erts_start_schedulers(void)
}
}

static Eterm
sched_pause_proc_timer(Process *c_p, void *vst, int *redsp, ErlHeapFragment **bp);
static Eterm
sched_resume_paused_proc_timer(Process *c_p, void *vst, int *redsp, ErlHeapFragment **bp);

BIF_RETTYPE
erts_internal_suspend_process_2(BIF_ALIST_2)
{
Expand All @@ -8906,6 +8911,7 @@ erts_internal_suspend_process_2(BIF_ALIST_2)
int sync = 0;
int async = 0;
int unless_suspending = 0;
int pause_proc_timer = 0;
erts_aint_t mstate;
ErtsMonitorSuspend *msp;
ErtsMonitorData *mdp;
Expand All @@ -8930,6 +8936,9 @@ erts_internal_suspend_process_2(BIF_ALIST_2)
case am_asynchronous:
async = 1;
break;
case am_pause_proc_timer:
pause_proc_timer = 1;
break;
default: {
if (is_tuple_arity(arg, 2)) {
Eterm *tp = tuple_val(arg);
Expand Down Expand Up @@ -9029,15 +9038,31 @@ erts_internal_suspend_process_2(BIF_ALIST_2)
sync = !async;
}
else {
noproc:
erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), &mdp->origin);
erts_monitor_release_both(mdp);
if (!async)
res = am_badarg;
goto noproc;
}
}
}

if (pause_proc_timer) {
int proc_timer_already_paused = msp->ptimer_count++;

if (!proc_timer_already_paused) {
erts_proc_sig_send_rpc_request(BIF_P,
BIF_ARG_1,
0, /* no reply */
sched_pause_proc_timer,
NULL);
}
}

while(0) {
noproc:
erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), &mdp->origin);
erts_monitor_release_both(mdp);
if (!async)
res = am_badarg;
}

if (sync) {
ASSERT(is_non_value(reply_tag));
reply_res = res;
Expand All @@ -9052,22 +9077,43 @@ erts_internal_suspend_process_2(BIF_ALIST_2)
}

/*
* The erlang:resume_process/1 BIF
* The erlang:resume_process/2 BIF
*/

BIF_RETTYPE
resume_process_1(BIF_ALIST_1)
resume_process_2(BIF_ALIST_2)
{
ErtsMonitor *mon;
ErtsMonitorSuspend *msp;
erts_aint_t mstate;

int prev_suspend_count;
int resume_proc_timer = 0;

if (BIF_P->common.id == BIF_ARG_1)
BIF_ERROR(BIF_P, BADARG);

if (!is_internal_pid(BIF_ARG_1))
BIF_ERROR(BIF_P, BADARG);

if (is_not_nil(BIF_ARG_2)) {
/* Parse option list */
Eterm arg = BIF_ARG_2;
while (is_list(arg)) {
Eterm *lp = list_val(arg);
arg = CAR(lp);
switch (arg) {
case am_resume_proc_timer:
resume_proc_timer = 1;
break;
default:
BIF_ERROR(BIF_P, BADARG);
}
arg = CDR(lp);
}
if (is_not_nil(arg))
BIF_ERROR(BIF_P, BADARG);
}

mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(BIF_P),
BIF_ARG_1);
if (!mon) {
Expand All @@ -9078,18 +9124,56 @@ resume_process_1(BIF_ALIST_1)
ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND);
msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);

if (resume_proc_timer && msp->ptimer_count == 0) {
BIF_ERROR(BIF_P, BADARG);
}

mstate = erts_atomic_dec_read_relb(&msp->state);
prev_suspend_count = mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK;

ASSERT((mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) >= 0);
ASSERT(prev_suspend_count >= 0);

if (msp->ptimer_count == prev_suspend_count + 1 && !resume_proc_timer) {
erts_atomic_inc_nob(&msp->state);
BIF_ERROR(BIF_P, BADARG);
}

if ((mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) == 0) {
if (resume_proc_timer) {
int needs_to_resume_timer = --msp->ptimer_count == 0;
if (needs_to_resume_timer) {
erts_proc_sig_send_rpc_request(BIF_P,
BIF_ARG_1,
0, /* no reply */
sched_resume_paused_proc_timer,
NULL);
}
}

if (prev_suspend_count == 0) {
erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), mon);
erts_proc_sig_send_demonitor(&BIF_P->common, BIF_P->common.id, 0, mon);
}

BIF_RET(am_true);
}

static Eterm
sched_pause_proc_timer(Process *c_p, void *vst, int *redsp, ErlHeapFragment **bp)
{
erts_pause_proc_timer(c_p);
*redsp = 1;
return THE_NON_VALUE;
}

static Eterm
sched_resume_paused_proc_timer(Process *c_p, void *vst, int *redsp, ErlHeapFragment **bp)
{
erts_resume_paused_proc_timer(c_p);
*redsp = 1;
return THE_NON_VALUE;
}


BIF_RETTYPE
erts_internal_is_process_executing_dirty_1(BIF_ALIST_1)
{
Expand Down
Loading

0 comments on commit 640ea44

Please sign in to comment.