diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..df3ab31 --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ +all: compile + +compile clean: + rebar3 $@ + +test eunit: + rebar3 eunit + +.PHONY: test diff --git a/README.md b/README.md index 3310b34..9c8023f 100644 --- a/README.md +++ b/README.md @@ -3,27 +3,37 @@ erlsem An OTP library implementing counting non-blocking semaphore with the ability to monitor a process that obtains a lock and to release the corresponding -resource(s) automatically in case the process exits. +resource(s) automatically when the process exits. Build ----- - $ rebar3 compile - $ rebar3 eunit +```shell +$ rebar3 compile +$ rebar3 eunit +``` Demo ---- - Eshell V13.1.3 (abort with ^G) - 1> Pid = self(). - <0.145.0> - 2> S = sema_nif:create(3). - #Ref<0.1541145699.2864316417.117910> - 3> L = [spawn(fun() -> Pid ! {self(), sema_nif:acquire(S)}, timer:sleep(10) end) || _ <- lists:seq(1, 5)]. - [<0.154.0>,<0.155.0>,<0.156.0>,<0.157.0>,<0.158.0>] - 4> [receive {P, X} -> X end || P <- L]. - [{ok,1}, - {ok,2}, - {ok,3}, - {error,full}, - {error,full}] +```erlang +Eshell V13.1.3 (abort with ^G) +1> Pid = self(). +<0.145.0> +2> S = sema_nif:create(3). +#Ref<0.1541145699.2864316417.117910> +3> L = [spawn(fun() -> Pid ! {self(), sema_nif:acquire(S)}, timer:sleep(10) end) || _ <- lists:seq(1, 5)]. +[<0.154.0>,<0.155.0>,<0.156.0>,<0.157.0>,<0.158.0>] +4> [receive {P, X} -> X end || P <- L]. +[{ok,1}, + {ok,2}, + {ok,3}, + {error,full}, + {error,full}] +5> sema_nif:create(sema, 5). +#Ref<0.1541145699.2864316445.123450> +6> sema_nif:acquire(sema). +{ok, 1} +6> sema_nif:release(sema). +{ok, 0} +``` diff --git a/c_src/Makefile b/c_src/Makefile index 8954814..74fc619 100644 --- a/c_src/Makefile +++ b/c_src/Makefile @@ -12,25 +12,28 @@ ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -eval "io:format(\"~ts\", [code:li C_SRC_DIR = $(CURDIR) C_SRC_OUTPUT ?= $(CURDIR)/../priv/$(PROJECT).so +ifeq ($(NIF_DEBUG),) + OPTIMIZE = -O3 -DNDEBUG +else + OPTIMIZE = -g -O0 +endif + # System type and C compiler/flags. UNAME_SYS := $(shell uname -s) -ifeq ($(UNAME_SYS), Darwin) +ifeq ($(UNAME_SYS),Darwin) CC ?= cc - CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes - CXXFLAGS ?= -O3 -finline-functions -Wall + CXXFLAGS ?= -finline-functions -Wall LDFLAGS ?= -flat_namespace -undefined suppress -else ifeq ($(UNAME_SYS), FreeBSD) +else ifeq ($(UNAME_SYS),FreeBSD) CC ?= cc - CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes - CXXFLAGS ?= -O3 -finline-functions -Wall -else ifeq ($(UNAME_SYS), Linux) + CXXFLAGS ?= -finline-functions -Wall +else ifeq ($(UNAME_SYS),Linux) CC ?= gcc - CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes - CXXFLAGS ?= -O3 -finline-functions -Wall + CXXFLAGS ?= -finline-functions -Wall endif -CXXFLAGS += -std=c++11 +CXXFLAGS += -std=c++20 $(OPTIMIZE) CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) diff --git a/c_src/sema_nif.cpp b/c_src/sema_nif.cpp index 1e1318a..43a471c 100644 --- a/c_src/sema_nif.cpp +++ b/c_src/sema_nif.cpp @@ -3,7 +3,10 @@ #include #include #include +#include #include +#include +#include #ifdef TRACE #include @@ -18,6 +21,9 @@ static ERL_NIF_TERM atom_not_found; static ERL_NIF_TERM atom_dead; static ERL_NIF_TERM atom_cnt; static ERL_NIF_TERM atom_max; +static ERL_NIF_TERM atom_name; +static ERL_NIF_TERM atom_undefined; +static ERL_NIF_TERM atom_duplicate_name; inline ERL_NIF_TERM mk_atom(ErlNifEnv *env, const char *name) { ERL_NIF_TERM ret; @@ -38,6 +44,21 @@ inline ERL_NIF_TERM unsigned_result(ErlNifEnv *env, unsigned ret) { return ok_tuple(env, enif_make_uint(env, ret)); } +inline std::tuple +make_binary(ErlNifEnv* env, size_t size) +{ + ERL_NIF_TERM term; + auto p = enif_make_new_binary(env, size, &term); + return std::make_tuple(term, p); +} + +inline ERL_NIF_TERM make_binary(ErlNifEnv* env, std::string_view const& str) +{ + auto [term, p] = make_binary(env, str.length()); + memcpy(p, str.data(), str.length()); + return term; +} + // this overload is needed by std::map inline bool operator<(const ErlNifPid& lhs, const ErlNifPid& rhs) { return enif_compare_pids(&lhs, &rhs) < 0; @@ -45,12 +66,8 @@ inline bool operator<(const ErlNifPid& lhs, const ErlNifPid& rhs) { template struct enif_allocator : std::allocator { - using typename std::allocator::pointer; using typename std::allocator::size_type; - template - struct rebind { typedef enif_allocator other; }; - enif_allocator() noexcept : std::allocator() {} template @@ -58,21 +75,38 @@ struct enif_allocator : std::allocator { : std::allocator(u) {} - pointer allocate(size_type size, const void *hint = 0) { + [[nodiscard]] constexpr T* allocate(size_type size) { void *p = enif_alloc(size * sizeof(T)); if (p == 0) throw std::bad_alloc(); - return static_cast(p); + return static_cast(p); } - void deallocate(pointer p, size_type size) { + void deallocate(T* p, size_type size) { enif_free(p); } }; +struct sema; + +struct registry { + bool add(ERL_NIF_TERM name, sema* s); + void remove(sema* s); + sema* get(ERL_NIF_TERM name); +private: + std::map m_reg; + std::mutex m_mtx; +}; + + +static registry* get_registry(ErlNifEnv* env) { + return static_cast(enif_priv_data(env)); +} + struct sema { std::atomic_uint cnt; std::atomic_uint dead_counter; - unsigned max; + unsigned max; + ERL_NIF_TERM name; typedef std::pair mon_cnt_t; std::map< @@ -85,7 +119,12 @@ struct sema { static ErlNifMonitor null_mon; - sema(unsigned n) : cnt(0), dead_counter(0), max(n) {} + sema(ErlNifEnv* env, unsigned n, ERL_NIF_TERM name = 0) + : cnt(0) + , dead_counter(0) + , max(n) + , name(name ? name : atom_undefined) + {} ERL_NIF_TERM info(ErlNifEnv *env) { ERL_NIF_TERM keys[] = {atom_dead, atom_cnt, atom_max}; @@ -213,6 +252,34 @@ struct sema { ErlNifMonitor sema::null_mon; +class sema_already_registered : public std::exception {}; + +//----------------------------------------------------------------------------- +// registry implementation +//----------------------------------------------------------------------------- +bool registry::add(ERL_NIF_TERM name, sema* s) { + if (name == 0 || name == atom_undefined) return false; + std::unique_lock lock(m_mtx); + auto it = m_reg.find(name); + if (it != m_reg.end()) + return false; + m_reg.emplace(std::make_pair(name, s)); + return true; +} +void registry::remove(sema* s) { + if (!s) return; + std::unique_lock lock(m_mtx); + m_reg.erase(s->name); +} +sema* registry::get(ERL_NIF_TERM name) { + std::unique_lock lock(m_mtx); + auto it = m_reg.find(name); + return (it == m_reg.end()) ? nullptr : it->second; +} + +//----------------------------------------------------------------------------- +// desctuction callbacks +//----------------------------------------------------------------------------- static void free_sema(ErlNifEnv *env, void *obj) { if (obj != nullptr) { sema& x = *(sema *)obj; @@ -220,6 +287,10 @@ static void free_sema(ErlNifEnv *env, void *obj) { auto n = x.cnt.load(std::memory_order_acquire); std::cout << "free> cnt: " << n << ", max: " << x.max << "\r\n"; #endif + auto reg = get_registry(env); + assert(reg); + reg->remove(&x); + x.~sema(); } } @@ -241,47 +312,64 @@ static bool open_resource(ErlNifEnv *env) { return SEMA != nullptr; } -static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { - if (!open_resource(env)) return -1; - atom_ok = mk_atom(env, "ok"); - atom_error = mk_atom(env, "error"); - atom_full = mk_atom(env, "full"); - atom_not_found = mk_atom(env, "not_found"); - atom_dead = mk_atom(env, "dead"); - atom_cnt = mk_atom(env, "cnt"); - atom_max = mk_atom(env, "max"); - return 0; -} +//----------------------------------------------------------------------------- +// NIF implementation +//----------------------------------------------------------------------------- static ERL_NIF_TERM create(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { - if (argc != 1) - return enif_make_badarg(env); + assert(argc >= 1 && argc <= 2); + int pos = argc == 1 ? 0 : 1; unsigned max; - if (!enif_get_uint(env, argv[0], &max)) + if (!enif_get_uint(env, argv[pos], &max)) return enif_make_badarg(env); - void *res = enif_alloc_resource(SEMA, sizeof(sema)); + ERL_NIF_TERM name = 0; + + if (argc > 1) { + if (!enif_is_atom(env, argv[0])) + return enif_make_badarg(env); + name = argv[0]; + } + + sema *res = static_cast(enif_alloc_resource(SEMA, sizeof(sema))); if (res == nullptr) return enif_make_badarg(env); - ERL_NIF_TERM ret = enif_make_resource(env, res); - enif_release_resource(res); + ERL_NIF_TERM ret; - new (res) sema(max); + auto reg = get_registry(env); + assert(reg); + if (name && name != atom_undefined && !reg->add(name, res)) + ret = enif_raise_exception(env, atom_duplicate_name); + else { + new (res) sema(env, max, name); + ret = enif_make_resource(env, res); + } + + enif_release_resource(res); return ret; } +static sema* +get_sema(ErlNifEnv *env, ERL_NIF_TERM id) { + if (enif_is_atom(env, id)) { + auto reg = get_registry(env); + assert(reg); + return reg->get(id); + } + + sema *res = nullptr; + return enif_get_resource(env, id, SEMA, (void **)&res) ? res : nullptr; +} + static ERL_NIF_TERM info(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { assert(argc == 1); - sema *res = nullptr; - if (!enif_get_resource(env, argv[0], SEMA, (void **)&res)) - return enif_make_badarg(env); - + sema *res = get_sema(env, argv[0]); if (res == nullptr) return enif_make_badarg(env); @@ -292,8 +380,8 @@ static ERL_NIF_TERM capacity(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { assert(argc >= 1 && argc <= 2); - sema *res = nullptr; - if (!enif_get_resource(env, argv[0], SEMA, (void **)&res) || !res) + sema *res = get_sema(env, argv[0]); + if (res == nullptr) return enif_make_badarg(env); unsigned max = 0; @@ -308,10 +396,7 @@ acquire(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { if (argc < 1 || argc > 2) return enif_make_badarg(env); - sema *res = nullptr; - if (!enif_get_resource(env, argv[0], SEMA, (void **)&res)) - return enif_make_badarg(env); - + sema *res = get_sema(env, argv[0]); if (res == nullptr) return enif_make_badarg(env); @@ -338,10 +423,7 @@ release(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { if (argc < 1 || argc > 3) return enif_make_badarg(env); - sema *res = nullptr; - if (!enif_get_resource(env, argv[0], SEMA, (void **)&res)) - return enif_make_badarg(env); - + sema *res = get_sema(env, argv[0]); if (res == nullptr) return enif_make_badarg(env); @@ -374,8 +456,35 @@ release(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { return res->release(env, pid, n); } +//----------------------------------------------------------------------------- +// NIF loading/unloading +//----------------------------------------------------------------------------- +static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { + if (!open_resource(env)) return -1; + atom_ok = mk_atom(env, "ok"); + atom_error = mk_atom(env, "error"); + atom_full = mk_atom(env, "full"); + atom_not_found = mk_atom(env, "not_found"); + atom_dead = mk_atom(env, "dead"); + atom_cnt = mk_atom(env, "cnt"); + atom_max = mk_atom(env, "max"); + atom_name = mk_atom(env, "name"); + atom_undefined = mk_atom(env, "undefined"); + atom_duplicate_name = mk_atom(env, "duplicate_name"); + + *priv_data = static_cast(new registry()); + + return 0; +} + +static void unload(ErlNifEnv *env, void *priv_data) { + if (priv_data) + delete static_cast(priv_data); +} + static ErlNifFunc nif_funcs[] = { {"create", 1, create}, + {"create", 2, create}, {"info", 1, info}, {"capacity", 1, capacity}, {"capacity", 2, capacity}, @@ -386,4 +495,4 @@ static ErlNifFunc nif_funcs[] = { {"release", 3, release} }; -ERL_NIF_INIT(sema_nif, nif_funcs, &load, nullptr, nullptr, nullptr); +ERL_NIF_INIT(sema_nif, nif_funcs, &load, nullptr, nullptr, unload); diff --git a/src/sema_nif.erl b/src/sema_nif.erl index 3d6f67e..e58c9f2 100644 --- a/src/sema_nif.erl +++ b/src/sema_nif.erl @@ -2,6 +2,7 @@ -export([ create/1, + create/2, info/1, capacity/1, capacity/2, @@ -14,6 +15,7 @@ -nifs([ create/1, + create/2, info/1, capacity/1, capacity/2, @@ -42,15 +44,22 @@ | {error, not_found}. -type sema_ref() :: reference(). +-type sema_id() :: sema_ref() | atom(). --export_type([sema_ref/0, acquire_ret/0, release_ret/0]). +-type sema_name() :: atom(). + +-export_type([sema_ref/0, sema_id/0, acquire_ret/0, release_ret/0]). % @doc Create a new semaphore with the given capacity -spec create(Max :: pos_integer()) -> sema_ref(). create(_) -> not_loaded(?LINE). +% @doc Create a named semaphore with the given capacity +-spec create(sema_name(), pos_integer()) -> sema_ref(). +create(Name, Max) when is_atom(Name), is_integer(Max) -> not_loaded(?LINE). + % @doc Get internal properties of the semaphore resource --spec info(Semaphore :: sema_ref()) -> +-spec info(Semaphore :: sema_id()) -> #{ % number of units acquired cnt := non_neg_integer(), @@ -62,32 +71,32 @@ create(_) -> not_loaded(?LINE). info(_) -> not_loaded(?LINE). % @doc Get semaphore maximum capacity --spec capacity(Semaphore :: sema_ref()) -> pos_integer(). +-spec capacity(Semaphore :: sema_id()) -> pos_integer(). capacity(_) -> not_loaded(?LINE). % @doc Set semaphore maximum capacity. % Return old capacity. --spec capacity(Semaphore :: sema_ref(), Max :: integer()) -> pos_integer(). +-spec capacity(Semaphore :: sema_id(), Max :: integer()) -> pos_integer(). capacity(_, _Max) -> not_loaded(?LINE). % @doc Acquire resource unit for calling process, monitor process --spec acquire(Semaphore :: sema_ref()) -> Ret :: acquire_ret(). +-spec acquire(Semaphore :: sema_id()) -> Ret :: acquire_ret(). acquire(_) -> not_loaded(?LINE). % @doc Acquire resource Cnt units for calling process, monitor process --spec acquire(Semaphore :: sema_ref(), Cnt :: pos_integer()) -> Ret :: acquire_ret(). +-spec acquire(Semaphore :: sema_id(), Cnt :: pos_integer()) -> Ret :: acquire_ret(). acquire(_, _) -> not_loaded(?LINE). % @doc Release resource unit acquired by calling process --spec release(Semaphore :: sema_ref()) -> Ret :: release_ret(). +-spec release(Semaphore :: sema_id()) -> Ret :: release_ret(). release(_) -> not_loaded(?LINE). % @doc Release resource unit(s) acquired by calling/another process --spec release(Semaphore :: sema_ref(), Arg :: pos_integer() | pid()) -> Ret :: release_ret(). +-spec release(Semaphore :: sema_id(), Arg :: pos_integer() | pid()) -> Ret :: release_ret(). release(_, _) -> not_loaded(?LINE). % @doc Release resource units acquired by another process --spec release(Semaphore :: sema_ref(), Cnt :: pos_integer(), Pid :: pid()) -> Ret :: release_ret(). +-spec release(Semaphore :: sema_id(), Cnt :: pos_integer(), Pid :: pid()) -> Ret :: release_ret(). release(_, _, _) -> not_loaded(?LINE). % internal diff --git a/test/sema_tests.erl b/test/sema_tests.erl index 4a75706..49d7f92 100644 --- a/test/sema_tests.erl +++ b/test/sema_tests.erl @@ -68,8 +68,20 @@ basic_api_test() -> ?assertEqual(3, sema_nif:capacity(S, 5)), ?assertEqual(5, sema_nif:capacity(S)), + ok. +sema_name_test() -> + S = sema_nif:create(sema_test, 3), + ?assert(is_reference(S)), + ?assertEqual(3, sema_nif:capacity(sema_test)), + ?assertEqual(3, sema_nif:capacity(S)), + ?assertEqual({ok, 1}, sema_nif:acquire(sema_test)), + ?assertEqual({ok, 0}, sema_nif:release(sema_test)), + ?assertEqual(#{cnt => 0, dead => 0, max => 3}, sema_nif:info(sema_test)), + ?assertEqual(3, sema_nif:capacity(sema_test, 5)), + ?assertEqual(5, sema_nif:capacity(sema_test)). + gc_test() -> S = sema_nif:create(3), ?assertEqual(#{cnt => 0, dead => 0, max => 3}, sema_nif:info(S)),