Skip to content

Commit

Permalink
Add name registry
Browse files Browse the repository at this point in the history
  • Loading branch information
saleyn committed Feb 27, 2024
1 parent 44129da commit e47035e
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 34 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
all: compile

compile clean:
rebar3 $@

test eunit:
rebar3 eunit

.PHONY: test
2 changes: 1 addition & 1 deletion c_src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ else ifeq ($(UNAME_SYS), Linux)
CXXFLAGS ?= -O3 -finline-functions -Wall
endif

CXXFLAGS += -std=c++11
CXXFLAGS += -std=c++20

CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR)
CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR)
Expand Down
133 changes: 109 additions & 24 deletions c_src/sema_nif.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
#include <atomic>
#include <mutex>
#include <map>
#include <string>
#include <cassert>
#include <cstring>
#include <memory>

#ifdef TRACE
#include <iostream>
Expand All @@ -18,6 +21,8 @@ static ERL_NIF_TERM atom_not_found;
static ERL_NIF_TERM atom_dead;
static ERL_NIF_TERM atom_cnt;
static ERL_NIF_TERM atom_max;
static ERL_NIF_TERM atom_name;
static ERL_NIF_TERM atom_undefined;

inline ERL_NIF_TERM mk_atom(ErlNifEnv *env, const char *name) {
ERL_NIF_TERM ret;
Expand All @@ -38,18 +43,32 @@ inline ERL_NIF_TERM unsigned_result(ErlNifEnv *env, unsigned ret) {
return ok_tuple(env, enif_make_uint(env, ret));
}

inline std::tuple<ERL_NIF_TERM, unsigned char*>
make_binary(ErlNifEnv* env, size_t size)
{
ERL_NIF_TERM term;
auto p = enif_make_new_binary(env, size, &term);
return std::make_tuple(term, p);
}

inline ERL_NIF_TERM make_binary(ErlNifEnv* env, std::string_view const& str)
{
auto [term, p] = make_binary(env, str.length());
memcpy(p, str.data(), str.length());
return term;
}

// this overload is needed by std::map<ErlNifPid>
inline bool operator<(const ErlNifPid& lhs, const ErlNifPid& rhs) {
return enif_compare_pids(&lhs, &rhs) < 0;
}

template<typename T>
struct enif_allocator : std::allocator<T> {
using typename std::allocator<T>::pointer;
using typename std::allocator<T>::size_type;

template<typename U>
struct rebind { typedef enif_allocator<U> other; };
//template<typename U>
//struct rebind { typedef enif_allocator<U> other; };

enif_allocator() noexcept : std::allocator<T>() {}

Expand All @@ -58,21 +77,35 @@ struct enif_allocator : std::allocator<T> {
: std::allocator<T>(u)
{}

pointer allocate(size_type size, const void *hint = 0) {
[[nodiscard]] constexpr T* allocate(size_type size) {
void *p = enif_alloc(size * sizeof(T));
if (p == 0) throw std::bad_alloc();
return static_cast<pointer>(p);
return static_cast<T*>(p);
}

void deallocate(pointer p, size_type size) {
void deallocate(T* p, size_type size) {
enif_free(p);
}
};

struct sema;

struct registry {
void add(ERL_NIF_TERM name, sema* s);
void remove(sema* s);
sema* get(ERL_NIF_TERM name);
private:
std::map<ERL_NIF_TERM, sema*> m_reg;
std::mutex m_mtx;
};

static std::unique_ptr<registry> s_registry;

struct sema {
std::atomic_uint cnt;
std::atomic_uint dead_counter;
unsigned max;
ERL_NIF_TERM name;

typedef std::pair<ErlNifMonitor, unsigned> mon_cnt_t;
std::map<
Expand All @@ -85,7 +118,18 @@ struct sema {

static ErlNifMonitor null_mon;

sema(unsigned n) : cnt(0), dead_counter(0), max(n) {}
sema(unsigned n, ERL_NIF_TERM name = 0)
: cnt(0)
, dead_counter(0)
, max(n)
, name(name ? name : atom_undefined)
{
s_registry->add(name, this);
}

~sema() {
s_registry->remove(this);
}

ERL_NIF_TERM info(ErlNifEnv *env) {
ERL_NIF_TERM keys[] = {atom_dead, atom_cnt, atom_max};
Expand Down Expand Up @@ -213,6 +257,28 @@ struct sema {

ErlNifMonitor sema::null_mon;

//-----------------------------------------------------------------------------
// registry implementation
//-----------------------------------------------------------------------------
void registry::add(ERL_NIF_TERM name, sema* s) {
if (name == 0 || name == atom_undefined) return;
std::unique_lock lock(m_mtx);
m_reg.emplace(std::make_pair(name, s));
}
void registry::remove(sema* s) {
if (!s) return;
std::unique_lock lock(m_mtx);
m_reg.erase(s->name);
}
sema* registry::get(ERL_NIF_TERM name) {
std::unique_lock lock(m_mtx);
auto it = m_reg.find(name);
return (it == m_reg.end()) ? nullptr : it->second;
}

//-----------------------------------------------------------------------------
// desctuction callbacks
//-----------------------------------------------------------------------------
static void free_sema(ErlNifEnv *env, void *obj) {
if (obj != nullptr) {
sema& x = *(sema *)obj;
Expand Down Expand Up @@ -241,6 +307,9 @@ static bool open_resource(ErlNifEnv *env) {
return SEMA != nullptr;
}

//-----------------------------------------------------------------------------
// NIF implementation
//-----------------------------------------------------------------------------
static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) {
if (!open_resource(env)) return -1;
atom_ok = mk_atom(env, "ok");
Expand All @@ -250,38 +319,59 @@ static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) {
atom_dead = mk_atom(env, "dead");
atom_cnt = mk_atom(env, "cnt");
atom_max = mk_atom(env, "max");
atom_name = mk_atom(env, "name");
atom_undefined = mk_atom(env, "undefined");

s_registry.reset(new registry());

return 0;
}

static ERL_NIF_TERM
create(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1)
return enif_make_badarg(env);
assert(argc >= 1 && argc <= 2);

unsigned max;
if (!enif_get_uint(env, argv[0], &max))
return enif_make_badarg(env);

ERL_NIF_TERM name = 0;

if (argc > 1) {
if (!enif_is_map(env, argv[1]))
return enif_make_badarg(env);

if (!enif_get_map_value(env, argv[1], atom_name, &name))
return enif_raise_exception(env,
enif_make_tuple2(env, atom_error, make_binary(env, "Missing option: name")));
}

void *res = enif_alloc_resource(SEMA, sizeof(sema));
if (res == nullptr)
return enif_make_badarg(env);

ERL_NIF_TERM ret = enif_make_resource(env, res);
enif_release_resource(res);

new (res) sema(max);
new (res) sema(max, name);

return ret;
}

static sema*
get_sema(ErlNifEnv *env, ERL_NIF_TERM id) {
if (enif_is_atom(env, id))
return s_registry->get(id);

sema *res = nullptr;
return enif_get_resource(env, id, SEMA, (void **)&res) ? res : nullptr;
}

static ERL_NIF_TERM
info(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
assert(argc == 1);

sema *res = nullptr;
if (!enif_get_resource(env, argv[0], SEMA, (void **)&res))
return enif_make_badarg(env);

sema *res = get_sema(env, argv[0]);
if (res == nullptr)
return enif_make_badarg(env);

Expand All @@ -292,8 +382,8 @@ static ERL_NIF_TERM
capacity(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
assert(argc >= 1 && argc <= 2);

sema *res = nullptr;
if (!enif_get_resource(env, argv[0], SEMA, (void **)&res) || !res)
sema *res = get_sema(env, argv[0]);
if (res == nullptr)
return enif_make_badarg(env);

unsigned max = 0;
Expand All @@ -308,10 +398,7 @@ acquire(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc < 1 || argc > 2)
return enif_make_badarg(env);

sema *res = nullptr;
if (!enif_get_resource(env, argv[0], SEMA, (void **)&res))
return enif_make_badarg(env);

sema *res = get_sema(env, argv[0]);
if (res == nullptr)
return enif_make_badarg(env);

Expand All @@ -338,10 +425,7 @@ release(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc < 1 || argc > 3)
return enif_make_badarg(env);

sema *res = nullptr;
if (!enif_get_resource(env, argv[0], SEMA, (void **)&res))
return enif_make_badarg(env);

sema *res = get_sema(env, argv[0]);
if (res == nullptr)
return enif_make_badarg(env);

Expand Down Expand Up @@ -376,6 +460,7 @@ release(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {

static ErlNifFunc nif_funcs[] = {
{"create", 1, create},
{"create", 2, create},
{"info", 1, info},
{"capacity", 1, capacity},
{"capacity", 2, capacity},
Expand Down
31 changes: 22 additions & 9 deletions src/sema_nif.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

-export([
create/1,
create/2,
info/1,
capacity/1,
capacity/2,
Expand All @@ -14,6 +15,7 @@

-nifs([
create/1,
create/2,
info/1,
capacity/1,
capacity/2,
Expand Down Expand Up @@ -42,15 +44,26 @@
| {error, not_found}.

-type sema_ref() :: reference().
-type sema_id() :: sema_ref() | atom().

-export_type([sema_ref/0, acquire_ret/0, release_ret/0]).
-type sema_opts() :: #{name => atom()}.

-export_type([sema_ref/0, sema_id/0, sema_opts/0, acquire_ret/0, release_ret/0]).

% @doc Create a new semaphore with the given capacity
-spec create(Max :: pos_integer()) -> sema_ref().
create(_) -> not_loaded(?LINE).

% @doc Create a new semaphore with the given capacity
% `Opts' is a map of options:
% <ul>
% <li>`{name, Name::atom()}' semaphore name
% </ul>
-spec create(pos_integer(), sema_opts()) -> sema_ref().
create(Max, Opts) when is_integer(Max), is_map(Opts) -> not_loaded(?LINE).

% @doc Get internal properties of the semaphore resource
-spec info(Semaphore :: sema_ref()) ->
-spec info(Semaphore :: sema_id()) ->
#{
% number of units acquired
cnt := non_neg_integer(),
Expand All @@ -62,32 +75,32 @@ create(_) -> not_loaded(?LINE).
info(_) -> not_loaded(?LINE).

% @doc Get semaphore maximum capacity
-spec capacity(Semaphore :: sema_ref()) -> pos_integer().
-spec capacity(Semaphore :: sema_id()) -> pos_integer().
capacity(_) -> not_loaded(?LINE).

% @doc Set semaphore maximum capacity.
% Return old capacity.
-spec capacity(Semaphore :: sema_ref(), Max :: integer()) -> pos_integer().
-spec capacity(Semaphore :: sema_id(), Max :: integer()) -> pos_integer().
capacity(_, _Max) -> not_loaded(?LINE).

% @doc Acquire resource unit for calling process, monitor process
-spec acquire(Semaphore :: sema_ref()) -> Ret :: acquire_ret().
-spec acquire(Semaphore :: sema_id()) -> Ret :: acquire_ret().
acquire(_) -> not_loaded(?LINE).

% @doc Acquire resource Cnt units for calling process, monitor process
-spec acquire(Semaphore :: sema_ref(), Cnt :: pos_integer()) -> Ret :: acquire_ret().
-spec acquire(Semaphore :: sema_id(), Cnt :: pos_integer()) -> Ret :: acquire_ret().
acquire(_, _) -> not_loaded(?LINE).

% @doc Release resource unit acquired by calling process
-spec release(Semaphore :: sema_ref()) -> Ret :: release_ret().
-spec release(Semaphore :: sema_id()) -> Ret :: release_ret().
release(_) -> not_loaded(?LINE).

% @doc Release resource unit(s) acquired by calling/another process
-spec release(Semaphore :: sema_ref(), Arg :: pos_integer() | pid()) -> Ret :: release_ret().
-spec release(Semaphore :: sema_id(), Arg :: pos_integer() | pid()) -> Ret :: release_ret().
release(_, _) -> not_loaded(?LINE).

% @doc Release resource units acquired by another process
-spec release(Semaphore :: sema_ref(), Cnt :: pos_integer(), Pid :: pid()) -> Ret :: release_ret().
-spec release(Semaphore :: sema_id(), Cnt :: pos_integer(), Pid :: pid()) -> Ret :: release_ret().
release(_, _, _) -> not_loaded(?LINE).

% internal
Expand Down
12 changes: 12 additions & 0 deletions test/sema_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,20 @@ basic_api_test() ->

?assertEqual(3, sema_nif:capacity(S, 5)),
?assertEqual(5, sema_nif:capacity(S)),

ok.

sema_name_test() ->
S = sema_nif:create(3, #{name => sema_test}),
?assert(is_reference(S)),
?assertEqual(3, sema_nif:capacity(sema_test)),
?assertEqual(3, sema_nif:capacity(S)),
?assertEqual({ok, 1}, sema_nif:acquire(sema_test)),
?assertEqual({ok, 0}, sema_nif:release(sema_test)),
?assertEqual(#{cnt => 0, dead => 0, max => 3}, sema_nif:info(sema_test)),
?assertEqual(3, sema_nif:capacity(sema_test, 5)),
?assertEqual(5, sema_nif:capacity(sema_test)).

gc_test() ->
S = sema_nif:create(3),
?assertEqual(#{cnt => 0, dead => 0, max => 3}, sema_nif:info(S)),
Expand Down

0 comments on commit e47035e

Please sign in to comment.