Skip to content

Commit

Permalink
Throttle bulkhead from the circuit breaker on state transition
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Kipper committed Jul 4, 2019
1 parent 9fb115a commit 13a7241
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 2 deletions.
75 changes: 73 additions & 2 deletions ext/semian/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ check_permissions_arg(VALUE permissions);
static double
check_default_timeout_arg(VALUE default_timeout);

static int
check_throttle_arg(VALUE throttle);

static void
ms_to_timespec(long ms, struct timespec *ts);

Expand Down Expand Up @@ -103,8 +106,13 @@ semian_resource_reset_workers(VALUE self)
TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);

sem_meta_lock(res->sem_id);
// This SETVAL will purge the SEM_UNDO table
ret = semctl(res->sem_id, SI_SEM_REGISTERED_WORKERS, SETVAL, 0);
{
// This SETVAL will purge the SEM_UNDO table
ret = semctl(res->sem_id, SI_SEM_REGISTERED_WORKERS, SETVAL, 0);
if (ret != -1) {
ret = semctl(res->sem_id, SI_SEM_TICKET_THROTTLE, SETVAL, 0);
}
}
sem_meta_unlock(res->sem_id);

if (ret == -1) {
Expand Down Expand Up @@ -238,6 +246,54 @@ semian_resource_in_use(VALUE self)
return Qtrue;
}

VALUE
semian_resource_throttle(VALUE self, VALUE value)
{
semian_resource_t *res = NULL;
TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);

int val = check_throttle_arg(value);
if (val == -1) {
rb_raise(rb_eArgError, "Unknown throttle argument");
}

sem_meta_lock(res->sem_id);
{
// Get the current value
int tickets = get_sem_val(res->sem_id, SI_SEM_CONFIGURED_TICKETS);

if (val > 0) {
int op = tickets - val;
dprintf("Throttling sem_id:%d (op:%d)", res->sem_id, op);

// Use SEM_UNDO so if this process is terminated, the throttle is undone.
if (perform_semop(res->sem_id, SI_SEM_CONFIGURED_TICKETS, -op, SEM_UNDO, NULL) == -1) {
sem_meta_unlock(res->sem_id);
rb_raise(eInternal, "Unknown error");
}
if (semctl(res->sem_id, SI_SEM_TICKET_THROTTLE, SETVAL, op) == -1) {
sem_meta_unlock(res->sem_id);
rb_raise(eInternal, "Unknown error");
}
} else {
int op = get_sem_val(res->sem_id, SI_SEM_TICKET_THROTTLE);
dprintf("Unthrottling sem_id:%d (op:%d)", res->sem_id, op);

if (perform_semop(res->sem_id, SI_SEM_CONFIGURED_TICKETS, op, SEM_UNDO, NULL) == -1) {
sem_meta_unlock(res->sem_id);
rb_raise(eInternal, "Unknown error");
}
if (semctl(res->sem_id, SI_SEM_TICKET_THROTTLE, SETVAL, 0) == -1) {
sem_meta_unlock(res->sem_id);
rb_raise(eInternal, "Unknown error");
}
}
}
sem_meta_unlock(res->sem_id);

return Qnil;
}

static VALUE
cleanup_semian_resource_acquire(VALUE self)
{
Expand Down Expand Up @@ -319,6 +375,21 @@ check_default_timeout_arg(VALUE default_timeout)
return NUM2DBL(default_timeout);
}

static int
check_throttle_arg(VALUE throttle)
{
switch (rb_type(throttle)) {
case T_NIL:
case T_UNDEF:
return 0;
case T_FIXNUM:
case T_BIGNUM:
return RB_NUM2INT(throttle);
default:
return -1;
}
}

static void
ms_to_timespec(long ms, struct timespec *ts)
{
Expand Down
4 changes: 4 additions & 0 deletions ext/semian/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,8 @@ semian_resource_alloc(VALUE klass);
VALUE
semian_resource_in_use(VALUE self);

// Throttle the bulkhead to a given value
VALUE
semian_resource_throttle(VALUE self, VALUE throttle);

#endif //SEMIAN_RESOURCE_H
1 change: 1 addition & 0 deletions ext/semian/semian.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void Init_semian()
rb_define_method(cResource, "reset_registered_workers!", semian_resource_reset_workers, 0);
rb_define_method(cResource, "unregister_worker", semian_resource_unregister_worker, 0);
rb_define_method(cResource, "in_use?", semian_resource_in_use, 0);
rb_define_method(cResource, "throttle", semian_resource_throttle, 1);

id_wait_time = rb_intern("wait_time");
id_timeout = rb_intern("timeout");
Expand Down
1 change: 1 addition & 0 deletions ext/semian/sysv_semaphores.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ initialize_new_semaphore_values(int sem_id, long permissions)
init_vals[SI_SEM_TICKETS] = init_vals[SI_SEM_CONFIGURED_TICKETS] = 0;
init_vals[SI_SEM_REGISTERED_WORKERS] = 0;
init_vals[SI_SEM_LOCK] = 1;
init_vals[SI_SEM_TICKET_THROTTLE] = 0;

if (semctl(sem_id, 0, SETALL, init_vals) == -1) {
raise_semian_syscall_error("semctl()", errno);
Expand Down
2 changes: 2 additions & 0 deletions ext/semian/sysv_semaphores.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ typedef VALUE (*my_blocking_fn_t)(void*);
// SI_SEM_TICKETS semaphore for the tickets currently issued
// SI_SEM_CONFIGURED_TICKETS semaphore to track the desired number of tickets available for issue
// SI_SEM_REGISTERED_WORKERS semaphore for the number of workers currently registered
// SI_SEM_TICKET_THROTTLE semaphore to track the current throttle
// SI_NUM_SEMAPHORES always leave this as last entry for count to be accurate
#define FOREACH_SEMINDEX(SEMINDEX) \
SEMINDEX(SI_SEM_LOCK) \
SEMINDEX(SI_SEM_TICKETS) \
SEMINDEX(SI_SEM_CONFIGURED_TICKETS) \
SEMINDEX(SI_SEM_REGISTERED_WORKERS) \
SEMINDEX(SI_SEM_TICKET_THROTTLE) \
SEMINDEX(SI_NUM_SEMAPHORES) \

#define GENERATE_ENUM(ENUM) ENUM,
Expand Down
12 changes: 12 additions & 0 deletions lib/semian/circuit_breaker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,31 @@ def transition_to_close
log_state_transition(:closed)
@state.close!
@errors.clear
throttle(nil)
end

def transition_to_open
notify_state_transition(:open)
log_state_transition(:open)
@state.open!
throttle(1)
end

def transition_to_half_open
notify_state_transition(:half_open)
log_state_transition(:half_open)
@state.half_open!
@successes.reset
throttle(@success_count_threshold)
end

def throttle(val = nil)
resource = Semian[@name]
return if resource.nil?
bulkhead = resource.bulkhead
return if bulkhead.nil?

bulkhead.throttle(val)
end

def success_threshold_reached?
Expand Down
11 changes: 11 additions & 0 deletions test/resource_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,17 @@ def test_memsize
assert_equal 128, ObjectSpace.memsize_of(r)
end

def test_throttle
id = Time.now.strftime('%H:%M:%S.%N')
resource = create_resource(id, quota: 0.5, timeout: 0.1)
fork_workers(resource: id, count: 15, quota: 0.5, wait_for_timeout: true)
assert_equal(8, resource.tickets)
resource.throttle(1)
assert_equal(1, resource.tickets)
resource.throttle(nil)
assert_equal(8, resource.tickets)
end

def create_resource(*args)
@resources ||= []
resource = Semian::Resource.new(*args)
Expand Down

0 comments on commit 13a7241

Please sign in to comment.